From 7992eb1173794d4824fd81bf006033d6b18e3a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 6 Oct 2023 21:03:06 +0200 Subject: [PATCH] Fix acks sent to Conduit on error in batch mode (#87) * fix acks sent to Conduit on error in batch mode * fix typo --- destination.go | 20 +++--- destination_test.go | 167 ++++++++++++++++++++++++++++++++++++++++++++ internal/batcher.go | 2 +- 3 files changed, 180 insertions(+), 9 deletions(-) diff --git a/destination.go b/destination.go index ddc51df..c553f7a 100644 --- a/destination.go +++ b/destination.go @@ -379,16 +379,20 @@ func (w *writeStrategyBatch) writeBatch(batch []writeBatchItem) error { } var ( - firstErr error - errOnce bool + ackResponse error + firstErr error + errOnce bool ) for i, item := range batch { - if i < n { - err := item.ack(err) - if err != nil && !errOnce { - firstErr = err - errOnce = true - } + if i == n { + // records from this index on failed to be written, include the + // error in the response + ackResponse = err + } + err := item.ack(ackResponse) + if err != nil && !errOnce { + firstErr = err + errOnce = true } } return firstErr diff --git a/destination_test.go b/destination_test.go index be9b405..ff37ef3 100644 --- a/destination_test.go +++ b/destination_test.go @@ -16,6 +16,7 @@ package sdk import ( "context" + "errors" "fmt" "io" "testing" @@ -172,6 +173,172 @@ func TestDestinationPluginAdapter_Run_Write(t *testing.T) { <-runDone } +func TestDestinationPluginAdapter_Run_WriteBatch_Success(t *testing.T) { + is := is.New(t) + ctrl := gomock.NewController(t) + dst := NewMockDestination(ctrl) + + dstPlugin := NewDestinationPlugin( + DestinationWithMiddleware(dst, DestinationWithBatch{}), + ).(*destinationPluginAdapter) + + want := Record{ + Position: Position("foo"), + Operation: OperationCreate, + Metadata: map[string]string{"foo": "bar"}, + Key: RawData("bar"), + Payload: Change{ + Before: nil, // create has no before + After: StructuredData{ + "x": "y", + "z": 3, + }, + }, + } + + batchConfig := map[string]string{ + configDestinationBatchDelay: "0s", + configDestinationBatchSize: "5", + } + + dst.EXPECT().Parameters() + dst.EXPECT().Configure(gomock.Any(), batchConfig).Return(nil) + dst.EXPECT().Open(gomock.Any()).Return(nil) + dst.EXPECT().Write(gomock.Any(), []Record{want, want, want, want, want}).Return(5, nil) + + stream, reqStream, respStream := newDestinationRunStreamMock(ctrl) + + ctx := context.Background() + _, err := dstPlugin.Configure(ctx, cpluginv1.DestinationConfigureRequest{Config: batchConfig}) + is.NoErr(err) + _, err = dstPlugin.Start(ctx, cpluginv1.DestinationStartRequest{}) + is.NoErr(err) + + runDone := make(chan struct{}) + go func() { + defer close(runDone) + err := dstPlugin.Run(ctx, stream) + is.NoErr(err) + }() + + // write 5 records + for i := 0; i < 5; i++ { + reqStream <- cpluginv1.DestinationRunRequest{ + Record: cpluginv1.Record{ + Position: want.Position, + Operation: cpluginv1.Operation(want.Operation), + Metadata: want.Metadata, + Key: cpluginv1.RawData(want.Key.(RawData)), + Payload: cpluginv1.Change{ + Before: nil, // create has no before + After: cpluginv1.StructuredData(want.Payload.After.(StructuredData)), + }, + }, + } + } + for i := 0; i < 5; i++ { + resp := <-respStream + is.Equal(resp, cpluginv1.DestinationRunResponse{ + AckPosition: want.Position, + Error: "", + }) + } + + // close stream + close(reqStream) + close(respStream) + + // wait for Run to exit + <-runDone +} + +func TestDestinationPluginAdapter_Run_WriteBatch_Partial(t *testing.T) { + is := is.New(t) + ctrl := gomock.NewController(t) + dst := NewMockDestination(ctrl) + + dstPlugin := NewDestinationPlugin( + DestinationWithMiddleware(dst, DestinationWithBatch{}), + ).(*destinationPluginAdapter) + + want := Record{ + Position: Position("foo"), + Operation: OperationCreate, + Metadata: map[string]string{"foo": "bar"}, + Key: RawData("bar"), + Payload: Change{ + Before: nil, // create has no before + After: StructuredData{ + "x": "y", + "z": 3, + }, + }, + } + + batchConfig := map[string]string{ + configDestinationBatchDelay: "0s", + configDestinationBatchSize: "5", + } + wantErr := errors.New("write error") + + dst.EXPECT().Parameters() + dst.EXPECT().Configure(gomock.Any(), batchConfig).Return(nil) + dst.EXPECT().Open(gomock.Any()).Return(nil) + dst.EXPECT().Write(gomock.Any(), []Record{want, want, want, want, want}).Return(3, wantErr) // only 3 records are written + + stream, reqStream, respStream := newDestinationRunStreamMock(ctrl) + + ctx := context.Background() + _, err := dstPlugin.Configure(ctx, cpluginv1.DestinationConfigureRequest{Config: batchConfig}) + is.NoErr(err) + _, err = dstPlugin.Start(ctx, cpluginv1.DestinationStartRequest{}) + is.NoErr(err) + + runDone := make(chan struct{}) + go func() { + defer close(runDone) + err := dstPlugin.Run(ctx, stream) + is.NoErr(err) + }() + + // write 5 records + for i := 0; i < 5; i++ { + reqStream <- cpluginv1.DestinationRunRequest{ + Record: cpluginv1.Record{ + Position: want.Position, + Operation: cpluginv1.Operation(want.Operation), + Metadata: want.Metadata, + Key: cpluginv1.RawData(want.Key.(RawData)), + Payload: cpluginv1.Change{ + Before: nil, // create has no before + After: cpluginv1.StructuredData(want.Payload.After.(StructuredData)), + }, + }, + } + } + for i := 0; i < 3; i++ { + resp := <-respStream + is.Equal(resp, cpluginv1.DestinationRunResponse{ + AckPosition: want.Position, + Error: "", + }) + } + for i := 0; i < 2; i++ { + resp := <-respStream + is.Equal(resp, cpluginv1.DestinationRunResponse{ + AckPosition: want.Position, + Error: wantErr.Error(), + }) + } + + // close stream + close(reqStream) + close(respStream) + + // wait for Run to exit + <-runDone +} + func TestDestinationPluginAdapter_Stop_AwaitLastRecord(t *testing.T) { is := is.New(t) ctrl := gomock.NewController(t) diff --git a/internal/batcher.go b/internal/batcher.go index 8e42a66..2387136 100644 --- a/internal/batcher.go +++ b/internal/batcher.go @@ -69,7 +69,7 @@ func (b *Batcher[T]) Enqueue(item T) EnqueueStatus { _ = b.flushNow() return Flushed } - if b.flushTimer == nil { + if b.flushTimer == nil && b.delayThreshold > 0 { b.flushTimer = time.AfterFunc(b.delayThreshold, func() { b.Flush() }) } return Scheduled