Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CF-557] Changelog updates should wait for transaction commit #137

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20-alpine
FROM golang:1.20-alpine3.18
ENV SRC github.com/segmentio/ctlstore
ARG VERSION

Expand Down
28 changes: 15 additions & 13 deletions pkg/changelog/changelog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package changelog

import (
"encoding/json"

"github.com/pkg/errors"
"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/events/v2"
)

Expand All @@ -16,25 +16,27 @@ type (
WriteLine WriteLine
}
ChangelogEntry struct {
Seq int64
Family string
Table string
Key []interface{}
Seq int64
Family string
Table string
Key []interface{}
LedgerSeq schema.DMLSequence
Transaction bool
}
)

func NewChangelogEntry(seq int64, family string, table string, key []interface{}) *ChangelogEntry {
return &ChangelogEntry{Seq: seq, Family: family, Table: table, Key: key}
}

func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error {
structure := struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really get why we inline this struct definition instead of just putitng the json tags on the ChangeLogEntry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question -- I don't get it either.

It reminds me that I'm unsure about why we have 2 separate structs for the changelog entries. Felt like I was having to sprinkle the same fields all over the place to get it through the pipes.

  • writing (this code):
    func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error {
    structure := struct {
    Seq int64 `json:"seq"`
    Family string `json:"family"`
    Table string `json:"table"`
    Key []interface{} `json:"key"`
    }{
  • reading:
    // entry represents a single row in the changelog
    // e.g.
    // {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]}
    type entry struct {
    Seq int64 `json:"seq"`
    Family string `json:"family"`
    Table string `json:"table"`
    Key []Key `json:"key"`
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if that's a consequence of calling the root of this pkg vs internal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like there should be just one definition of this data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the json annotations too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, answers by Copilot:

what would you suspect the reason why in this file that the structure in WriteChange function duplicates the ChangelogEntry? why would the developer wrote the code not reuse the ChangelogEntry

The developer might have chosen to define a separate structure within the WriteChange function for a few reasons:

  1. Field Tagging: The inline structure allows the developer to add struct tags (json:"seq", json:"family", etc.) which are used for JSON marshalling. These tags define how the struct fields are encoded to JSON, which is not specified in the ChangelogEntry struct.

  2. Encapsulation: The ChangelogEntry struct might be used in various parts of the codebase, and the developer might not want to tie its implementation too closely to the JSON encoding used in this specific function. By using a separate structure for the JSON encoding, changes to the ChangelogEntry struct will not directly impact the WriteChange function.

  3. Flexibility: If in the future, the WriteChange function needs to include additional data or transform the data before writing, having a separate structure within the function makes it easier to do so without affecting the ChangelogEntry struct.

In general, this approach provides a level of decoupling between the ChangelogEntry data structure and the specific requirements of the WriteChange function.

Seq int64 `json:"seq"`
Family string `json:"family"`
Table string `json:"table"`
Key []interface{} `json:"key"`
Seq int64 `json:"seq"`
LedgerSeq int64 `json:"ledgerSeq"`
Transaction bool `json:"tx"`
Family string `json:"family"`
Table string `json:"table"`
Key []interface{} `json:"key"`
}{
e.Seq,
e.LedgerSeq.Int(),
e.Transaction,
e.Family,
e.Table,
e.Key,
Expand Down
2 changes: 1 addition & 1 deletion pkg/changelog/changelog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ func TestWriteChange(t *testing.T) {
})
require.NoError(t, err)
require.EqualValues(t, 1, len(mock.Lines))
require.Equal(t, `{"seq":42,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0])
require.Equal(t, `{"seq":42,"ledgerSeq":0,"tx":false,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0])
}
2 changes: 1 addition & 1 deletion pkg/cmd/ctlstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type supervisorCliConfig struct {
type ledgerHealthConfig struct {
Disable bool `conf:"disable" help:"disable ledger latency health attributing (DEPRECATED: use disable-ecs-behavior instead)"`
DisableECSBehavior bool `conf:"disable-ecs-behavior" help:"disable ledger latency health attributing"`
MaxHealthyLatency time.Duration `conf:"max-healty-latency" help:"Max latency considered healthy"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one of the annoying aspects of this being OSS is this is a change that we can't verify doesn't break anything. Internally, I didn't see any hits for healty related to ctlstore though.

Copy link
Collaborator Author

@erikdw erikdw Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched internally too -- it's a good call out that this could break something externally, but my belief is that no one is actually using this system outside of Segment, so I'm not too concerned.

That being said, I could be convinced to add an "unsupported" entry that has the typo'ed configuration name and then panics. i.e., with this change if someone had the typo'ed configuration knob it would just be silently ignored AFAIU.

Or even have logic to have that typo'ed configuration entry configure the "right" one and spit out a loud deprecation warning message. We don't have real releases so it's a bit unclear what the best strategy is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine just doing a breaking change. It may not be great OSS stewardship, but I have a hard time believing there's even a single other user of ctlstore outside of segment

MaxHealthyLatency time.Duration `conf:"max-healthy-latency" help:"Max latency considered healthy"`
AttributeName string `conf:"attribute-name" help:"The name of the attribute"`
HealthyAttributeValue string `conf:"healthy-attribute-value" help:"The value of the attribute if healthy"`
UnhealthyAttributeValue string `conf:"unhealth-attribute-value" help:"The value of the attribute if unhealthy"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/changelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestChangelog(t *testing.T) {
numEvents: 10000,
rotateAfterBytes: 1024 * 128,
writeDelay: 100 * time.Microsecond,
mustRotateN: 8,
mustRotateN: 9,
},
} {
t.Run(test.name, func(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions pkg/event/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ package event

// entry represents a single row in the changelog
// e.g.
// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]}
//
// {"seq":1,"ledgerSeq":42,"tx":false,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]}
type entry struct {
Seq int64 `json:"seq"`
Family string `json:"family"`
Table string `json:"table"`
Key []Key `json:"key"`
Seq int64 `json:"seq"`
LedgerSeq int64 `json:"ledgerSeq"`
Transaction bool `json:"tx"`
Family string `json:"family"`
Table string `json:"table"`
Key []Key `json:"key"`
}

// event converts the entry into an event for the iterator to return
func (e entry) event() Event {
return Event{
Sequence: e.Seq,
Sequence: e.Seq,
LedgerSequence: e.LedgerSeq,
Transaction: e.Transaction,
RowUpdate: RowUpdate{
FamilyName: e.Family,
TableName: e.Table,
Expand Down
6 changes: 4 additions & 2 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package event

// Event is the type that the Iterator produces
type Event struct {
Sequence int64
RowUpdate RowUpdate
Sequence int64
LedgerSequence int64
Transaction bool
RowUpdate RowUpdate
}

// RowUpdate represents a single row update
Expand Down
2 changes: 1 addition & 1 deletion pkg/executive/db_executive.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames
}

// We first write the column modification to the DML ledger within the transaction.
// It's important that this is done befored the DDL is applied to the ctldb, as
// It's important that this is done before the DDL is applied to the ctldb, as
// the DDL is not able to be rolled back. In this way, if the DDL fails, the DML
// can be rolled back.
dlw := dmlLedgerWriter{Tx: tx, TableName: dmlLedgerTableName}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ldb/ldbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ func FetchSeqFromLdb(ctx context.Context, db *sql.DB) (schema.DMLSequence, error
}
return schema.DMLSequence(seq), err
}

func IsInternalTable(name string) bool {
return name == LDBSeqTableName || name == LDBLastUpdateTableName
}
10 changes: 6 additions & 4 deletions pkg/ldbwriter/changelog_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat
for _, key := range keys {
seq := atomic.AddInt64(&c.Seq, 1)
err = c.ChangelogWriter.WriteChange(changelog.ChangelogEntry{
Seq: seq,
Family: fam.Name,
Table: tbl.Name,
Key: key,
Seq: seq,
LedgerSeq: change.LedgerSequence,
Transaction: data.Transaction,
Family: fam.Name,
Table: tbl.Name,
Key: key,
})
if err != nil {
events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v",
Expand Down
84 changes: 76 additions & 8 deletions pkg/ldbwriter/ldb_callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,101 @@ package ldbwriter
import (
"context"
"database/sql"

"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/ctlstore/pkg/sqlite"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"
)

// CallbackWriter is an LDBWriter that delegates to another
// writer and then, upon a successful write, executes N callbacks.
type CallbackWriter struct {
DB *sql.DB
Delegate LDBWriter
Callbacks []LDBWriteCallback
DB *sql.DB
Delegate LDBWriter
Callbacks []LDBWriteCallback
// Buffer between SQLite preupdate Hook and this code
ChangeBuffer *sqlite.SQLChangeBuffer
// Accumulated changes across multiple ApplyDMLStatement calls
transactionChanges []sqlite.SQLiteWatchChange
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consider a cap on the size of a transaction so we don't buffer too much?

Copy link
Collaborator Author

@erikdw erikdw Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting proposal, issues around that are where my head was at as I was adding the metrics.

This consideration relates to the "support SoR transactions" project. We don't yet know how large SoR transactions can get, especially for those that we care about for that project. Putting some cap here with a loud complaint that we exceeded it (log + metric) and then dumping the currently buffered changes to the changelog (i.e., invoking callbacks) seems a fine behavior for now.

We know that the changes would not be greater than 200 right now based on hardcoded limits elsewhere & the behavior of the REPLACE INTO, so perhaps just setting it to like 500 for now would be good. i.e.,

  • REPLACE INTO is translated by SQLite into a DELETE op then an INSERT op ➡️ 2 changes.
  • max of 100 entries in a ledger transaction:
    // Reject requests that are too large
    if len(requests) > limits.LimitMaxMutateRequestCount {
    return &errs.PayloadTooLargeError{Err: "Number of requests exceeds maximum"}
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds good to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not relevant to today's transactions since they are size limited but rather to the future when we are supporting SoR transactions. A sane number of statements within a transaction is smart, but by doing this we need to ensure this limit is well understood externally. Once we say we support transactions we need to be sure we really do.

}

func (w *CallbackWriter) inTransaction() bool {
return w.transactionChanges != nil
}

func (w *CallbackWriter) beginTransaction(ledgerSequence schema.DMLSequence) {
if len(w.transactionChanges) > 0 {
// This should never happen, but just in case...
stats.Add("ldb_changes_abandoned", len(w.transactionChanges))
events.Log("error: abandoned %{count}d changes from incomplete transaction, current statement's ledger sequence: %{sequence}d",
len(w.transactionChanges), ledgerSequence)
}
w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0)
}

// Transaction done! Return the accumulated changes including the latest ones
func (w *CallbackWriter) endTransaction(changes []sqlite.SQLiteWatchChange) (transactionChanges []sqlite.SQLiteWatchChange) {
w.accumulateChanges(changes)
transactionChanges = w.transactionChanges
w.transactionChanges = nil
return
}

// Transaction isn't over yet, save the latest changes
func (w *CallbackWriter) accumulateChanges(changes []sqlite.SQLiteWatchChange) {
w.transactionChanges = append(w.transactionChanges, changes...)
}

// ApplyDMLStatement
//
// It is not obvious, but this code executes synchronously:
// 1. Delegate.AppyDMLStatement executes the DML statement against the SQLite LDB.
// (⚠️ WARNING: That's what the code is wired up to do today, January 2024, though the Delegate
// could be doing other things since the code is so flexible.)
// 2. When SQLite processes the statement it invokes our preupdate hook (see sqlite_watch.go).
// 3. Our preupdate hook writes the changes to the change buffer.
// 4. The code returns here, and we decide whether to process the change buffer immediately or
// wait until the end of the ledger transaction.
func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error {
err := w.Delegate.ApplyDMLStatement(ctx, statement)
if err != nil {
return err
}

// If beginning a transaction then start accumulating changes, don't send them out yet
if statement.Statement == schema.DMLTxBeginKey {
w.beginTransaction(statement.Sequence)
return nil
}

changes := w.ChangeBuffer.Pop()

// Record the responsible ledger sequence in each change so that the callback can use it
for i := range changes {
changes[i].LedgerSequence = statement.Sequence
}

var transaction bool
if w.inTransaction() {
transaction = true
if statement.Statement == schema.DMLTxEndKey {
// Transaction done, let's send what we have accumulated
changes = w.endTransaction(changes)
} else {
// Transaction not over, continue accumulating
w.accumulateChanges(changes)
return nil
}
}

stats.Observe("ldb_changes_written", len(changes))
for _, callback := range w.Callbacks {
events.Debug("Writing DML callback for %{cb}T", callback)
events.Debug("Writing DML callback for %{cb}T with %{changeCount}d changes", callback, len(changes))
callback.LDBWritten(ctx, LDBWriteMetadata{
DB: w.DB,
Statement: statement,
Changes: changes,
DB: w.DB,
Statement: statement,
Changes: changes,
Transaction: transaction,
})
}
return nil
Expand Down
Loading