Skip to content

Commit

Permalink
Connector state (fix races) (#89)
Browse files Browse the repository at this point in the history
* csync run tests

* track connector state

* make connector state watcher safer to use

* remove commented code

* improve states

* update golangci-lint

* update golangci-lint v1.55.2

* add test for source stop

* check error in test

* fix comment

Co-authored-by: Maha Hajja <[email protected]>

* organize imports, simplify teardown

* comment about timeouts on Stop

---------

Co-authored-by: Maha Hajja <[email protected]>
  • Loading branch information
lovromazgon and maha-hajja authored Nov 9, 2023
1 parent e83c8d6 commit 4d9c681
Show file tree
Hide file tree
Showing 11 changed files with 617 additions and 113 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: "1.21"
go-version-file: 'go.mod'

- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.52.2
version: v1.55.2
args: --timeout=2m
36 changes: 31 additions & 5 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,34 @@ func (a *destinationPluginAdapter) ack(r Record, writeErr error, stream cpluginv
return nil
}

// Stop will initiate the stop of the destination connector. It will first wait
// that the last position processed by the connector matches the last position
// in the request and then trigger a flush, in case there are any cached records
// (relevant in case of batching).
// If the requested last position is not encountered in 1 minute it will proceed
// flushing records received so far and return an error. Flushing of records
// also has a timeout of 1 minute, after which the stop operation returns with
// an error. In the worst case this operation can thus take 2 minutes.
func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.DestinationStopRequest) (cpluginv1.DestinationStopResponse, error) {
// last thing we do is cancel context in Open
defer a.openCancel()

// wait for last record to be received
_, err := a.lastPosition.Watch(ctx, func(val Position) bool {
return bytes.Equal(val, req.LastPosition)
}, csync.WithTimeout(time.Minute)) // TODO make the timeout configurable (https://github.com/ConduitIO/conduit/issues/183)
// wait for last record to be received, if it doesn't arrive in time we try
// to flush what we have so far
actualLastPosition, err := a.lastPosition.Watch(
ctx,
func(val Position) bool {
return bytes.Equal(val, req.LastPosition)
},
csync.WithTimeout(stopTimeout),
)
if err != nil {
err = fmt.Errorf("did not encounter expected last position %q, actual last position %q: %w", req.LastPosition, actualLastPosition, err)
Logger(ctx).Warn().Err(err).Msg("proceeding to flush records that were received so far (other records won't be acked)")
}

// flush cached records, allow it to take at most 1 minute
flushCtx, cancel := context.WithTimeout(ctx, time.Minute) // TODO make the timeout configurable
flushCtx, cancel := context.WithTimeout(ctx, stopTimeout)
defer cancel()

flushErr := a.writeStrategy.Flush(flushCtx)
Expand All @@ -253,6 +270,15 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
}

func (a *destinationPluginAdapter) Teardown(ctx context.Context, _ cpluginv1.DestinationTeardownRequest) (cpluginv1.DestinationTeardownResponse, error) {
// cancel open context, in case Stop was not called (can happen in case the
// stop was triggered by an error)
// teardown can be called without "open" being called previously
// e.g. when Conduit is validating a connector configuration,
// it will call "configure" and then "teardown".
if a.openCancel != nil {
a.openCancel()
}

err := a.impl.Teardown(ctx)
if err != nil {
return cpluginv1.DestinationTeardownResponse{}, err
Expand Down
118 changes: 118 additions & 0 deletions internal/connectorstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate stringer -type ConnectorState -trimprefix State

package internal

import (
"context"
"fmt"
"slices"

"github.com/conduitio/conduit-connector-sdk/internal/csync"
)

type ConnectorState int

const (
StateInitial ConnectorState = iota
StateConfiguring
StateConfigured
StateStarting
StateStarted
StateInitiatingRun
StateRunning
StateInitiatingStop
StateStopping
StateStopped
StateTearingDown
StateTornDown

StateErrored ConnectorState = 500
)

type ConnectorStateWatcher csync.ValueWatcher[ConnectorState]

type DoWithLockOptions struct {
ExpectedStates []ConnectorState
StateBefore ConnectorState
StateAfter ConnectorState
WaitForExpectedState bool
}

func (w *ConnectorStateWatcher) DoWithLock(
ctx context.Context,
opts DoWithLockOptions,
f func(currentState ConnectorState) error,
) error {
vw := (*csync.ValueWatcher[ConnectorState])(w)
lockedWatcher := vw.Lock()
locked := true // keep track if the lock is still locked
defer func() {
if locked {
lockedWatcher.Unlock()
}
}()

currentState := lockedWatcher.Get()

if len(opts.ExpectedStates) > 0 {
for !slices.Contains(opts.ExpectedStates, currentState) {
if !opts.WaitForExpectedState {
return fmt.Errorf("expected connector state %q, actual connector state is %q", opts.ExpectedStates, currentState)
}
lockedWatcher.Unlock()
lockedWatcher = nil // discard locked watcher after unlock
locked = false // prevent another unlock in defer

_, err := vw.Watch(ctx, csync.WatchValues(opts.ExpectedStates...))
if err != nil {
return err
}

// lock watcher again and check current state in case it changed between
// watch and the second lock
lockedWatcher = vw.Lock()
locked = true
currentState = lockedWatcher.Get()
}
}

w.swap(lockedWatcher, opts.StateBefore)

err := f(currentState)
if err != nil {
lockedWatcher.Set(StateErrored)
return err
}

w.swap(lockedWatcher, opts.StateAfter)
return nil
}

func (w *ConnectorStateWatcher) Set(newState ConnectorState) bool {
lockedWatcher := (*csync.ValueWatcher[ConnectorState])(w).Lock()
defer lockedWatcher.Unlock()
return w.swap(lockedWatcher, newState)
}

func (w *ConnectorStateWatcher) swap(lvw *csync.LockedValueWatcher[ConnectorState], newState ConnectorState) bool {
if lvw.Get() >= newState {
// states can only increase
return false
}
lvw.Set(newState)
return true
}
44 changes: 44 additions & 0 deletions internal/connectorstate_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/csync/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func applyAndRemoveCtxOptions(ctx context.Context, opts []Option) (context.Conte
ctx, cancel = ctxOpt.applyCtx(ctx)
cancelFns = append(cancelFns, cancel)
}

return ctx, func() {
// call cancel functions in reverse
for i := len(cancelFns) - 1; i >= 0; i-- {
Expand Down
4 changes: 2 additions & 2 deletions internal/csync/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
// that don't take a context and can potentially block the execution forever.
func Run(ctx context.Context, fn func(), opts ...Option) error {
ctx, cancel, opts := applyAndRemoveCtxOptions(ctx, opts)
defer cancel()
if len(opts) > 0 {
panic(fmt.Sprintf("invalid option type: %T", opts[0]))
return fmt.Errorf("invalid option type: %T", opts[0])
}
defer cancel()

done := make(chan struct{})
go func() {
Expand Down
55 changes: 55 additions & 0 deletions internal/csync/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package csync

import (
"context"
"testing"
"time"

"github.com/matryer/is"
)

func TestRun_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()

var executed bool
err := Run(ctx, func() { executed = true })
is.NoErr(err)
is.True(executed)
}

func TestRun_Canceled(t *testing.T) {
is := is.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()

// run function that blocks for 1 second
err := Run(ctx, func() { <-time.After(time.Second) })
is.Equal(err, context.Canceled)
}

func TestRun_DeadlineReached(t *testing.T) {
is := is.New(t)
ctx := context.Background()

start := time.Now()
err := Run(ctx, func() { <-time.After(time.Second) }, WithTimeout(time.Millisecond*100))
since := time.Since(start)

is.Equal(err, context.DeadlineExceeded)
is.True(since >= time.Millisecond*100)
}
Loading

0 comments on commit 4d9c681

Please sign in to comment.