Skip to content

Commit

Permalink
Add batch timeout as configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Sep 20, 2024
1 parent e405365 commit bae6446
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 18 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/inngest/dbcap/pkg/changeset"
"github.com/inngest/dbcap/pkg/eventwriter"
Expand Down Expand Up @@ -33,7 +34,7 @@ func main() {
panic(err)
}

writer := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
writer := eventwriter.NewCallbackWriter(ctx, 1, 50*time.Millisecond, func(batch []*changeset.Changeset) error {
if len(batch) == 0 || batch[0] == nil {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/eventwriter/api_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func ChangesetToEvent(cs changeset.Changeset) map[string]any {
func NewAPIClientWriter(
ctx context.Context,
batchSize int,
batchTimeout time.Duration,
client inngestgo.Client,
) EventWriter {
return NewCallbackWriter(ctx, batchSize, func(cs []*changeset.Changeset) error {
return NewCallbackWriter(ctx, batchSize, batchTimeout, func(cs []*changeset.Changeset) error {
return send(client, cs)
})
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/eventwriter/callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
func NewCallbackWriter(
ctx context.Context,
batchSize int,
batchTimeout time.Duration,
onChangeset func(cs []*changeset.Changeset) error,
) EventWriter {
cs := make(chan *changeset.Changeset, batchSize)
Expand All @@ -28,13 +29,18 @@ func NewCallbackWriter(
type cbWriter struct {
onChangeset func([]*changeset.Changeset) error

cs chan *changeset.Changeset
batchSize int
cs chan *changeset.Changeset
batchSize int
batchTimeout time.Duration

wg sync.WaitGroup
}

func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkCommitter) chan *changeset.Changeset {
if a.batchTimeout < 100*time.Millisecond {
a.batchTimeout = 100 * time.Millisecond
}

a.wg.Add(1)
go func() {
defer a.wg.Done()
Expand All @@ -45,7 +51,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
// sendCtx is an additional uncancelled CTX which will be cancelled
// 5 seconds after the
for {
timer := time.NewTimer(batchTimeout)
timer := time.NewTimer(a.batchTimeout)

select {
case <-ctx.Done():
Expand All @@ -59,7 +65,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
case <-timer.C:
// Force sending current batch
if i == 0 {
timer.Reset(batchTimeout)
timer.Reset(a.batchTimeout)
continue
}

Expand Down Expand Up @@ -91,7 +97,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
buf[i] = msg
i++
// Send this batch after at least 5 seconds
timer.Reset(batchTimeout)
timer.Reset(a.batchTimeout)
}
}
}()
Expand Down
7 changes: 0 additions & 7 deletions pkg/eventwriter/eventwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package eventwriter

import (
"context"
"time"

"github.com/inngest/dbcap/pkg/changeset"
)
Expand All @@ -12,12 +11,6 @@ const (
eventPrefix = "pg"
)

var (
// batchTimeout represents the time in which we wait for the event writer batch
// to fill before sending the current batch of events.
batchTimeout = 100 * time.Millisecond
)

type EventWriter interface {
// Listen returns a channel in which Changesets can be published. Any published
// changesets will be broadcast as an event.
Expand Down
8 changes: 4 additions & 4 deletions pkg/replicator/pgreplicator/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestCommit(t *testing.T) {

// Set up event writer which listens to changes
var latestReceivedLSN pglogrepl.LSN
cb := eventwriter.NewCallbackWriter(ctx, 1, func(cs []*changeset.Changeset) error {
cb := eventwriter.NewCallbackWriter(ctx, 1, time.Millisecond, func(cs []*changeset.Changeset) error {
latestReceivedLSN = cs[0].Watermark.LSN
return nil
})
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestInsert(t *testing.T) {
inserts int32
)

cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cb := eventwriter.NewCallbackWriter(ctx, 1, time.Millisecond, func(batch []*changeset.Changeset) error {
cs := batch[0]
next := atomic.AddInt32(&inserts, 1)

Expand Down Expand Up @@ -245,7 +245,7 @@ func TestUpdateMany_ReplicaIdentityFull(t *testing.T) {
updates int32
)

cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cb := eventwriter.NewCallbackWriter(ctx, 1, time.Millisecond, func(batch []*changeset.Changeset) error {
cs := batch[0]
atomic.AddInt32(&total, 1)

Expand Down Expand Up @@ -398,7 +398,7 @@ func TestUpdateMany_DisableReplicaIdentityFull(t *testing.T) {
updates int32
)

cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cb := eventwriter.NewCallbackWriter(ctx, 1, time.Millisecond, func(batch []*changeset.Changeset) error {
cs := batch[0]
atomic.AddInt32(&total, 1)

Expand Down

0 comments on commit bae6446

Please sign in to comment.