Skip to content

Commit

Permalink
Merge branch 'main' into lovro/write-batch-acks
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 6, 2023
2 parents b951e3c + aa990a7 commit 7d39ce8
Show file tree
Hide file tree
Showing 14 changed files with 1,022 additions and 316 deletions.
15 changes: 6 additions & 9 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/conduitio/conduit-connector-protocol/cpluginv1"
"github.com/conduitio/conduit-connector-sdk/internal"
"github.com/conduitio/conduit-connector-sdk/internal/csync"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -99,7 +100,7 @@ func NewDestinationPlugin(impl Destination) cpluginv1.DestinationPlugin {
type destinationPluginAdapter struct {
impl Destination

lastPosition *internal.AtomicValueWatcher[Position]
lastPosition *csync.ValueWatcher[Position]
openCancel context.CancelFunc

// write is the chosen write strategy, either single records or batches
Expand Down Expand Up @@ -166,7 +167,7 @@ func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context, c
}

func (a *destinationPluginAdapter) Start(ctx context.Context, _ cpluginv1.DestinationStartRequest) (cpluginv1.DestinationStartResponse, error) {
a.lastPosition = new(internal.AtomicValueWatcher[Position])
a.lastPosition = new(csync.ValueWatcher[Position])

// detach context, so we can control when it's canceled
ctxOpen := internal.DetachContext(ctx)
Expand Down Expand Up @@ -205,7 +206,7 @@ func (a *destinationPluginAdapter) Run(ctx context.Context, stream cpluginv1.Des
err = a.writeStrategy.Write(ctx, r, func(err error) error {
return a.ack(r, err, stream)
})
a.lastPosition.Store(r.Position)
a.lastPosition.Set(r.Position)
if err != nil {
return err
}
Expand All @@ -232,14 +233,10 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
// last thing we do is cancel context in Open
defer a.openCancel()

// wait for at most 1 minute
waitCtx, cancel := context.WithTimeout(ctx, time.Minute) // TODO make the timeout configurable (https://github.com/ConduitIO/conduit/issues/183)
defer cancel()

// wait for last record to be received
err := a.lastPosition.Await(waitCtx, func(val Position) bool {
_, err := a.lastPosition.Watch(ctx, func(val Position) bool {
return bytes.Equal(val, req.LastPosition)
})
}, csync.WithTimeout(time.Minute)) // TODO make the timeout configurable (https://github.com/ConduitIO/conduit/issues/183)

// flush cached records, allow it to take at most 1 minute
flushCtx, cancel := context.WithTimeout(ctx, time.Minute) // TODO make the timeout configurable
Expand Down
117 changes: 0 additions & 117 deletions internal/atomicvaluewatcher.go

This file was deleted.

160 changes: 0 additions & 160 deletions internal/atomicvaluewatcher_test.go

This file was deleted.

Loading

0 comments on commit 7d39ce8

Please sign in to comment.