diff --git a/Dockerfile b/Dockerfile index a70bf5d3..f3891f68 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20-alpine +FROM golang:1.20-alpine3.18 ENV SRC github.com/segmentio/ctlstore ARG VERSION diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index e3c2fba4..cb58e322 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -2,8 +2,8 @@ package changelog import ( "encoding/json" - "github.com/pkg/errors" + "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/events/v2" ) @@ -16,25 +16,27 @@ type ( WriteLine WriteLine } ChangelogEntry struct { - Seq int64 - Family string - Table string - Key []interface{} + Seq int64 + Family string + Table string + Key []interface{} + LedgerSeq schema.DMLSequence + Transaction bool } ) -func NewChangelogEntry(seq int64, family string, table string, key []interface{}) *ChangelogEntry { - return &ChangelogEntry{Seq: seq, Family: family, Table: table, Key: key} -} - func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { structure := struct { - Seq int64 `json:"seq"` - Family string `json:"family"` - Table string `json:"table"` - Key []interface{} `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Transaction bool `json:"tx"` + Family string `json:"family"` + Table string `json:"table"` + Key []interface{} `json:"key"` }{ e.Seq, + e.LedgerSeq.Int(), + e.Transaction, e.Family, e.Table, e.Key, diff --git a/pkg/changelog/changelog_writer_test.go b/pkg/changelog/changelog_writer_test.go index bf70a948..7ea7e2f4 100644 --- a/pkg/changelog/changelog_writer_test.go +++ b/pkg/changelog/changelog_writer_test.go @@ -32,5 +32,5 @@ func TestWriteChange(t *testing.T) { }) require.NoError(t, err) require.EqualValues(t, 1, len(mock.Lines)) - require.Equal(t, `{"seq":42,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0]) + require.Equal(t, `{"seq":42,"ledgerSeq":0,"tx":false,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0]) } diff --git a/pkg/cmd/ctlstore/main.go b/pkg/cmd/ctlstore/main.go index 31bbb99a..dee8be11 100644 --- a/pkg/cmd/ctlstore/main.go +++ b/pkg/cmd/ctlstore/main.go @@ -114,7 +114,7 @@ type supervisorCliConfig struct { type ledgerHealthConfig struct { Disable bool `conf:"disable" help:"disable ledger latency health attributing (DEPRECATED: use disable-ecs-behavior instead)"` DisableECSBehavior bool `conf:"disable-ecs-behavior" help:"disable ledger latency health attributing"` - MaxHealthyLatency time.Duration `conf:"max-healty-latency" help:"Max latency considered healthy"` + MaxHealthyLatency time.Duration `conf:"max-healthy-latency" help:"Max latency considered healthy"` AttributeName string `conf:"attribute-name" help:"The name of the attribute"` HealthyAttributeValue string `conf:"healthy-attribute-value" help:"The value of the attribute if healthy"` UnhealthyAttributeValue string `conf:"unhealth-attribute-value" help:"The value of the attribute if unhealthy"` diff --git a/pkg/event/changelog_test.go b/pkg/event/changelog_test.go index 3e813caa..3aa7b13e 100644 --- a/pkg/event/changelog_test.go +++ b/pkg/event/changelog_test.go @@ -135,7 +135,7 @@ func TestChangelog(t *testing.T) { numEvents: 10000, rotateAfterBytes: 1024 * 128, writeDelay: 100 * time.Microsecond, - mustRotateN: 8, + mustRotateN: 9, }, } { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/event/entry.go b/pkg/event/entry.go index e175153d..b12e7c23 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -2,18 +2,23 @@ package event // entry represents a single row in the changelog // e.g. -// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// +// {"seq":1,"ledgerSeq":42,"tx":false,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { - Seq int64 `json:"seq"` - Family string `json:"family"` - Table string `json:"table"` - Key []Key `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Transaction bool `json:"tx"` + Family string `json:"family"` + Table string `json:"table"` + Key []Key `json:"key"` } // event converts the entry into an event for the iterator to return func (e entry) event() Event { return Event{ - Sequence: e.Seq, + Sequence: e.Seq, + LedgerSequence: e.LedgerSeq, + Transaction: e.Transaction, RowUpdate: RowUpdate{ FamilyName: e.Family, TableName: e.Table, diff --git a/pkg/event/event.go b/pkg/event/event.go index 4cfc2d7d..0acef6b4 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -2,8 +2,10 @@ package event // Event is the type that the Iterator produces type Event struct { - Sequence int64 - RowUpdate RowUpdate + Sequence int64 + LedgerSequence int64 + Transaction bool + RowUpdate RowUpdate } // RowUpdate represents a single row update diff --git a/pkg/executive/db_executive.go b/pkg/executive/db_executive.go index fc2bbf4f..07066d17 100644 --- a/pkg/executive/db_executive.go +++ b/pkg/executive/db_executive.go @@ -319,7 +319,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames } // We first write the column modification to the DML ledger within the transaction. - // It's important that this is done befored the DDL is applied to the ctldb, as + // It's important that this is done before the DDL is applied to the ctldb, as // the DDL is not able to be rolled back. In this way, if the DDL fails, the DML // can be rolled back. dlw := dmlLedgerWriter{Tx: tx, TableName: dmlLedgerTableName} diff --git a/pkg/ldb/ldbs.go b/pkg/ldb/ldbs.go index 9fca941d..6e7b38f0 100644 --- a/pkg/ldb/ldbs.go +++ b/pkg/ldb/ldbs.go @@ -121,3 +121,7 @@ func FetchSeqFromLdb(ctx context.Context, db *sql.DB) (schema.DMLSequence, error } return schema.DMLSequence(seq), err } + +func IsInternalTable(name string) bool { + return name == LDBSeqTableName || name == LDBLastUpdateTableName +} diff --git a/pkg/ldbwriter/changelog_callback.go b/pkg/ldbwriter/changelog_callback.go index be6d58fb..17559f21 100644 --- a/pkg/ldbwriter/changelog_callback.go +++ b/pkg/ldbwriter/changelog_callback.go @@ -37,10 +37,12 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat for _, key := range keys { seq := atomic.AddInt64(&c.Seq, 1) err = c.ChangelogWriter.WriteChange(changelog.ChangelogEntry{ - Seq: seq, - Family: fam.Name, - Table: tbl.Name, - Key: key, + Seq: seq, + LedgerSeq: change.LedgerSequence, + Transaction: data.Transaction, + Family: fam.Name, + Table: tbl.Name, + Key: key, }) if err != nil { events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index ad1a4148..ce4464ab 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -3,33 +3,101 @@ package ldbwriter import ( "context" "database/sql" - "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" "github.com/segmentio/events/v2" + "github.com/segmentio/stats/v4" ) // CallbackWriter is an LDBWriter that delegates to another // writer and then, upon a successful write, executes N callbacks. type CallbackWriter struct { - DB *sql.DB - Delegate LDBWriter - Callbacks []LDBWriteCallback + DB *sql.DB + Delegate LDBWriter + Callbacks []LDBWriteCallback + // Buffer between SQLite preupdate Hook and this code ChangeBuffer *sqlite.SQLChangeBuffer + // Accumulated changes across multiple ApplyDMLStatement calls + transactionChanges []sqlite.SQLiteWatchChange +} + +func (w *CallbackWriter) inTransaction() bool { + return w.transactionChanges != nil +} + +func (w *CallbackWriter) beginTransaction(ledgerSequence schema.DMLSequence) { + if len(w.transactionChanges) > 0 { + // This should never happen, but just in case... + stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) + events.Log("error: abandoned %{count}d changes from incomplete transaction, current statement's ledger sequence: %{sequence}d", + len(w.transactionChanges), ledgerSequence) + } + w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) +} + +// Transaction done! Return the accumulated changes including the latest ones +func (w *CallbackWriter) endTransaction(changes []sqlite.SQLiteWatchChange) (transactionChanges []sqlite.SQLiteWatchChange) { + w.accumulateChanges(changes) + transactionChanges = w.transactionChanges + w.transactionChanges = nil + return +} + +// Transaction isn't over yet, save the latest changes +func (w *CallbackWriter) accumulateChanges(changes []sqlite.SQLiteWatchChange) { + w.transactionChanges = append(w.transactionChanges, changes...) } +// ApplyDMLStatement +// +// It is not obvious, but this code executes synchronously: +// 1. Delegate.AppyDMLStatement executes the DML statement against the SQLite LDB. +// (⚠️ WARNING: That's what the code is wired up to do today, January 2024, though the Delegate +// could be doing other things since the code is so flexible.) +// 2. When SQLite processes the statement it invokes our preupdate hook (see sqlite_watch.go). +// 3. Our preupdate hook writes the changes to the change buffer. +// 4. The code returns here, and we decide whether to process the change buffer immediately or +// wait until the end of the ledger transaction. func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.Delegate.ApplyDMLStatement(ctx, statement) if err != nil { return err } + + // If beginning a transaction then start accumulating changes, don't send them out yet + if statement.Statement == schema.DMLTxBeginKey { + w.beginTransaction(statement.Sequence) + return nil + } + changes := w.ChangeBuffer.Pop() + + // Record the responsible ledger sequence in each change so that the callback can use it + for i := range changes { + changes[i].LedgerSequence = statement.Sequence + } + + var transaction bool + if w.inTransaction() { + transaction = true + if statement.Statement == schema.DMLTxEndKey { + // Transaction done, let's send what we have accumulated + changes = w.endTransaction(changes) + } else { + // Transaction not over, continue accumulating + w.accumulateChanges(changes) + return nil + } + } + + stats.Observe("ldb_changes_written", len(changes)) for _, callback := range w.Callbacks { - events.Debug("Writing DML callback for %{cb}T", callback) + events.Debug("Writing DML callback for %{cb}T with %{changeCount}d changes", callback, len(changes)) callback.LDBWritten(ctx, LDBWriteMetadata{ - DB: w.DB, - Statement: statement, - Changes: changes, + DB: w.DB, + Statement: statement, + Changes: changes, + Transaction: transaction, }) } return nil diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go new file mode 100644 index 00000000..e57c23dc --- /dev/null +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -0,0 +1,204 @@ +package ldbwriter + +import ( + "context" + "database/sql" + "github.com/segmentio/ctlstore/pkg/ldb" + "github.com/segmentio/ctlstore/pkg/schema" + "github.com/segmentio/ctlstore/pkg/sqlite" + "github.com/stretchr/testify/assert" + "testing" +) + +/* + * Simple LDBWriteCallback handler that just stores the changes it gets. + */ +type TestUpdateCallbackHandler struct { + Changes []sqlite.SQLiteWatchChange +} + +// Record the changes propagated from ApplyDMLStatement +func (u *TestUpdateCallbackHandler) LDBWritten(ctx context.Context, data LDBWriteMetadata) { + // The [:0] slice operation will reuse the underlying array of u.Changes if it's large enough + // to hold all elements of data.Changes, otherwise it will allocate a new one. + u.Changes = append(u.Changes[:0], data.Changes...) +} + +func (u *TestUpdateCallbackHandler) UpdateCount() int { + return len(u.Changes) +} + +func (u *TestUpdateCallbackHandler) Reset() { + u.Changes = u.Changes[:0] + return +} + +/* + * Test strategy: + * Check how many times we get callbacks while applying DML statements, + * and how many updates we get per callback. + */ +func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { + // Begin boilerplate + var err error + ctx := context.Background() + var changeBuffer sqlite.SQLChangeBuffer + dbName := "test_ldb_callback_writer" + _ = sqlite.RegisterSQLiteWatch(dbName, &changeBuffer) + + db, err := sql.Open(dbName, ":memory:") + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + defer db.Close() + + err = ldb.EnsureLdbInitialized(ctx, db) + if err != nil { + t.Fatalf("Couldn't initialize SQLite db, error %v", err) + } + // End boilerplate + + // Set up the callback writer with our test callback handler + ldbWriteCallback := &TestUpdateCallbackHandler{} + + writer := CallbackWriter{ + DB: db, + Delegate: &SqlLdbWriter{Db: db}, + Callbacks: []LDBWriteCallback{ldbWriteCallback}, + ChangeBuffer: &changeBuffer, + } + + err = writer.ApplyDMLStatement(ctx, schema.NewTestDMLStatement("CREATE TABLE foo (bar VARCHAR);")) + if err != nil { + t.Fatalf("Could not issue CREATE TABLE statements, error %v", err) + } + + type args struct { + ctx context.Context + statements []schema.DMLStatement + } + tests := []struct { + name string + args args + expectedCallbacks int + expectedUpdatesPerCallback int + wantErr bool + }{ + { + name: "Test one bare statement", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{schema.NewTestDMLStatement("INSERT INTO foo VALUES('dummy');")}, + }, + expectedCallbacks: 1, + expectedUpdatesPerCallback: 1, + wantErr: false, + }, + { + name: "Test three bare statements", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement("INSERT INTO foo VALUES('boston');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('detroit');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('chicago');"), + }, + }, + // bare statements outside of a transaction should get a callback each time + expectedCallbacks: 3, + expectedUpdatesPerCallback: 1, + wantErr: false, + }, + { + name: "Test three statements in a ledger transaction", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('asdf');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('foo');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('bar');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 3 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 3, + wantErr: false, + }, + { + name: "Test two statements in a ledger transaction", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('blue');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('green');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 2 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 2, + wantErr: false, + }, + { + name: "Test four statements in a ledger transaction, using REPLACE INTO", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('lion');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('green');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('boston');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('detroit')"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 4 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 4, + wantErr: false, + }, + { + name: "Test six statements in two ledger transactions", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('cat');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('dog');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('hamster');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('fish');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('rabbit');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('ferret');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since these are transactions, we expect two callbacks, and each should have all 3 updates + expectedCallbacks: 2, + expectedUpdatesPerCallback: 3, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + callbackCount := 0 + for _, statement := range tt.args.statements { + if err := writer.ApplyDMLStatement(tt.args.ctx, statement); (err != nil) != tt.wantErr { + t.Errorf("ApplyDMLStatement() error = %v, wantErr %v", err, tt.wantErr) + } + // did we get a callback from that statement being applied? + if ldbWriteCallback.UpdateCount() > 0 { + callbackCount++ + assert.Equal(t, tt.expectedUpdatesPerCallback, ldbWriteCallback.UpdateCount()) + // delete previous callback's update entries since we "handled" the callback + ldbWriteCallback.Reset() + } + } + assert.Equal(t, tt.expectedCallbacks, callbackCount) + }) + } +} diff --git a/pkg/ldbwriter/ldb_writer.go b/pkg/ldbwriter/ldb_writer.go index a3dfe84f..1c10d81c 100644 --- a/pkg/ldbwriter/ldb_writer.go +++ b/pkg/ldbwriter/ldb_writer.go @@ -28,9 +28,10 @@ type LDBWriteCallback interface { // LDBWriteMetadata contains the metadata about a statement that was written // to the LDB. type LDBWriteMetadata struct { - DB *sql.DB - Statement schema.DMLStatement - Changes []sqlite.SQLiteWatchChange + DB *sql.DB + Statement schema.DMLStatement + Changes []sqlite.SQLiteWatchChange + Transaction bool } // ldbWriter applies statements to a SQL database diff --git a/pkg/ldbwriter/ldb_writer_with_changelog.go b/pkg/ldbwriter/ldb_writer_with_changelog.go index 6f3bc9e5..f527d017 100644 --- a/pkg/ldbwriter/ldb_writer_with_changelog.go +++ b/pkg/ldbwriter/ldb_writer_with_changelog.go @@ -19,6 +19,8 @@ type LDBWriterWithChangelog struct { Seq int64 } +// ⚠️ WARNING: This code is not used except in a unit test. +// See ldb_callback_writer.go for the real code and more description. // // NOTE: How does the changelog work? // @@ -33,7 +35,6 @@ type LDBWriterWithChangelog struct { // This is pretty complex, but after enumerating about 8 different options, it // ended up actually being the most simple. Other options involved not-so-great // options like parsing SQL or maintaining triggers on every table. -// func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.LdbWriter.ApplyDMLStatement(ctx, statement) if err != nil { diff --git a/pkg/reflector/reflector_test.go b/pkg/reflector/reflector_test.go index 9d8fda93..c87c3ab6 100644 --- a/pkg/reflector/reflector_test.go +++ b/pkg/reflector/reflector_test.go @@ -208,7 +208,7 @@ func TestReflector(t *testing.T) { clBytes, err := ioutil.ReadFile(changelogPath) require.NoError(t, err) - expectChangelog := "{\"seq\":1,\"family\":\"family1\",\"table\":\"table1234\",\"key\":[{\"name\":\"field1\",\"type\":\"INTEGER\",\"value\":1234}]}\n" + expectChangelog := "{\"seq\":1,\"ledgerSeq\":2,\"tx\":false,\"family\":\"family1\",\"table\":\"table1234\",\"key\":[{\"name\":\"field1\",\"type\":\"INTEGER\",\"value\":1234}]}\n" if diff := cmp.Diff(expectChangelog, string(clBytes)); diff != "" { t.Errorf("Changelog contents differ\n%s", diff) } diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index 9d351ce7..75f8773c 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -3,8 +3,8 @@ package sqlite import ( "context" "database/sql" - "github.com/pkg/errors" + "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/scanfunc" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/go-sqlite3" @@ -12,13 +12,14 @@ import ( type ( SQLiteWatchChange struct { - Op int - DatabaseName string - TableName string - OldRowID int64 - NewRowID int64 - OldRow []interface{} - NewRow []interface{} + Op int + DatabaseName string + TableName string + OldRowID int64 + NewRowID int64 + OldRow []interface{} + NewRow []interface{} + LedgerSequence schema.DMLSequence } // pkAndMeta is a primary key value with name and type metadata to boot pkAndMeta struct { @@ -40,6 +41,11 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error { var newRow []interface{} var oldRow []interface{} + // Don't bother propagating updates of our internal bookkeeping tables + if ldb.IsInternalTable(pud.TableName) { + return + } + if pud.Op == sqlite3.SQLITE_UPDATE || pud.Op == sqlite3.SQLITE_DELETE { oldRow = make([]interface{}, cnt) err := pud.Old(oldRow...)