Skip to content

Commit

Permalink
Fix acks sent to Conduit on error in batch mode (#87)
Browse files Browse the repository at this point in the history
* fix acks sent to Conduit on error in batch mode

* fix typo
  • Loading branch information
lovromazgon authored Oct 6, 2023
1 parent aa990a7 commit 7992eb1
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 9 deletions.
20 changes: 12 additions & 8 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,20 @@ func (w *writeStrategyBatch) writeBatch(batch []writeBatchItem) error {
}

var (
firstErr error
errOnce bool
ackResponse error
firstErr error
errOnce bool
)
for i, item := range batch {
if i < n {
err := item.ack(err)
if err != nil && !errOnce {
firstErr = err
errOnce = true
}
if i == n {
// records from this index on failed to be written, include the
// error in the response
ackResponse = err
}
err := item.ack(ackResponse)
if err != nil && !errOnce {
firstErr = err
errOnce = true
}
}
return firstErr
Expand Down
167 changes: 167 additions & 0 deletions destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sdk

import (
"context"
"errors"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -172,6 +173,172 @@ func TestDestinationPluginAdapter_Run_Write(t *testing.T) {
<-runDone
}

func TestDestinationPluginAdapter_Run_WriteBatch_Success(t *testing.T) {
is := is.New(t)
ctrl := gomock.NewController(t)
dst := NewMockDestination(ctrl)

dstPlugin := NewDestinationPlugin(
DestinationWithMiddleware(dst, DestinationWithBatch{}),
).(*destinationPluginAdapter)

want := Record{
Position: Position("foo"),
Operation: OperationCreate,
Metadata: map[string]string{"foo": "bar"},
Key: RawData("bar"),
Payload: Change{
Before: nil, // create has no before
After: StructuredData{
"x": "y",
"z": 3,
},
},
}

batchConfig := map[string]string{
configDestinationBatchDelay: "0s",
configDestinationBatchSize: "5",
}

dst.EXPECT().Parameters()
dst.EXPECT().Configure(gomock.Any(), batchConfig).Return(nil)
dst.EXPECT().Open(gomock.Any()).Return(nil)
dst.EXPECT().Write(gomock.Any(), []Record{want, want, want, want, want}).Return(5, nil)

stream, reqStream, respStream := newDestinationRunStreamMock(ctrl)

ctx := context.Background()
_, err := dstPlugin.Configure(ctx, cpluginv1.DestinationConfigureRequest{Config: batchConfig})
is.NoErr(err)
_, err = dstPlugin.Start(ctx, cpluginv1.DestinationStartRequest{})
is.NoErr(err)

runDone := make(chan struct{})
go func() {
defer close(runDone)
err := dstPlugin.Run(ctx, stream)
is.NoErr(err)
}()

// write 5 records
for i := 0; i < 5; i++ {
reqStream <- cpluginv1.DestinationRunRequest{
Record: cpluginv1.Record{
Position: want.Position,
Operation: cpluginv1.Operation(want.Operation),
Metadata: want.Metadata,
Key: cpluginv1.RawData(want.Key.(RawData)),
Payload: cpluginv1.Change{
Before: nil, // create has no before
After: cpluginv1.StructuredData(want.Payload.After.(StructuredData)),
},
},
}
}
for i := 0; i < 5; i++ {
resp := <-respStream
is.Equal(resp, cpluginv1.DestinationRunResponse{
AckPosition: want.Position,
Error: "",
})
}

// close stream
close(reqStream)
close(respStream)

// wait for Run to exit
<-runDone
}

func TestDestinationPluginAdapter_Run_WriteBatch_Partial(t *testing.T) {
is := is.New(t)
ctrl := gomock.NewController(t)
dst := NewMockDestination(ctrl)

dstPlugin := NewDestinationPlugin(
DestinationWithMiddleware(dst, DestinationWithBatch{}),
).(*destinationPluginAdapter)

want := Record{
Position: Position("foo"),
Operation: OperationCreate,
Metadata: map[string]string{"foo": "bar"},
Key: RawData("bar"),
Payload: Change{
Before: nil, // create has no before
After: StructuredData{
"x": "y",
"z": 3,
},
},
}

batchConfig := map[string]string{
configDestinationBatchDelay: "0s",
configDestinationBatchSize: "5",
}
wantErr := errors.New("write error")

dst.EXPECT().Parameters()
dst.EXPECT().Configure(gomock.Any(), batchConfig).Return(nil)
dst.EXPECT().Open(gomock.Any()).Return(nil)
dst.EXPECT().Write(gomock.Any(), []Record{want, want, want, want, want}).Return(3, wantErr) // only 3 records are written

stream, reqStream, respStream := newDestinationRunStreamMock(ctrl)

ctx := context.Background()
_, err := dstPlugin.Configure(ctx, cpluginv1.DestinationConfigureRequest{Config: batchConfig})
is.NoErr(err)
_, err = dstPlugin.Start(ctx, cpluginv1.DestinationStartRequest{})
is.NoErr(err)

runDone := make(chan struct{})
go func() {
defer close(runDone)
err := dstPlugin.Run(ctx, stream)
is.NoErr(err)
}()

// write 5 records
for i := 0; i < 5; i++ {
reqStream <- cpluginv1.DestinationRunRequest{
Record: cpluginv1.Record{
Position: want.Position,
Operation: cpluginv1.Operation(want.Operation),
Metadata: want.Metadata,
Key: cpluginv1.RawData(want.Key.(RawData)),
Payload: cpluginv1.Change{
Before: nil, // create has no before
After: cpluginv1.StructuredData(want.Payload.After.(StructuredData)),
},
},
}
}
for i := 0; i < 3; i++ {
resp := <-respStream
is.Equal(resp, cpluginv1.DestinationRunResponse{
AckPosition: want.Position,
Error: "",
})
}
for i := 0; i < 2; i++ {
resp := <-respStream
is.Equal(resp, cpluginv1.DestinationRunResponse{
AckPosition: want.Position,
Error: wantErr.Error(),
})
}

// close stream
close(reqStream)
close(respStream)

// wait for Run to exit
<-runDone
}

func TestDestinationPluginAdapter_Stop_AwaitLastRecord(t *testing.T) {
is := is.New(t)
ctrl := gomock.NewController(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (b *Batcher[T]) Enqueue(item T) EnqueueStatus {
_ = b.flushNow()
return Flushed
}
if b.flushTimer == nil {
if b.flushTimer == nil && b.delayThreshold > 0 {
b.flushTimer = time.AfterFunc(b.delayThreshold, func() { b.Flush() })
}
return Scheduled
Expand Down

0 comments on commit 7992eb1

Please sign in to comment.