Skip to content

Commit

Permalink
improve states
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Nov 7, 2023
1 parent e8713aa commit b9c09bf
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
10 changes: 2 additions & 8 deletions internal/connectorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ const (
StateConfigured
StateStarting
StateStarted
StateInitiatingRun
StateRunning
StateInitiatingStop
StateStopping
StateStopped
StateTearingDown
Expand Down Expand Up @@ -100,14 +102,6 @@ func (w *ConnectorStateWatcher) DoWithLock(
return nil
}

func (w *ConnectorStateWatcher) CheckErrorAndSwap(err error, newState ConnectorState) bool {
if err != nil {
return w.Set(StateErrored)
} else {
return w.Set(newState)
}
}

func (w *ConnectorStateWatcher) Set(newState ConnectorState) bool {
lockedWatcher := (*csync.ValueWatcher[ConnectorState])(w).Lock()
defer lockedWatcher.Unlock()
Expand Down
18 changes: 10 additions & 8 deletions internal/connectorstate_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 17 additions & 13 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (a *sourcePluginAdapter) Start(ctx context.Context, req cpluginv1.SourceSta
func (a *sourcePluginAdapter) Run(ctx context.Context, stream cpluginv1.SourceRunStream) (err error) {
err = a.state.DoWithLock(ctx, internal.DoWithLockOptions{
ExpectedStates: []internal.ConnectorState{internal.StateStarted},
StateBefore: internal.StateRunning, // TODO create another state
StateBefore: internal.StateInitiatingRun,
StateAfter: internal.StateRunning,
WaitForExpectedState: false,
}, func(_ internal.ConnectorState) error {
Expand All @@ -218,7 +218,13 @@ func (a *sourcePluginAdapter) Run(ctx context.Context, stream cpluginv1.SourceRu
return err
}

defer func() { a.state.CheckErrorAndSwap(err, internal.StateStopped) }()
defer func() {
if err != nil {
a.state.Set(internal.StateErrored)
} else {
a.state.Set(internal.StateStopped)
}
}()

<-a.t.Dying() // stop as soon as it's dying
return a.t.Err()
Expand Down Expand Up @@ -287,9 +293,9 @@ func (a *sourcePluginAdapter) Stop(ctx context.Context, _ cpluginv1.SourceStopRe
ExpectedStates: []internal.ConnectorState{
internal.StateRunning, internal.StateStopping, internal.StateTornDown, internal.StateErrored,
},
StateBefore: internal.StateStopping,
StateAfter: internal.StateStopping, // TODO create another state?
WaitForExpectedState: true, // wait for one of the expected states
StateBefore: internal.StateInitiatingStop,
StateAfter: internal.StateStopping,
WaitForExpectedState: true, // wait for one of the expected states
}, func(state internal.ConnectorState) error {
if state != internal.StateRunning {
// stop already executed or we errored out, in any case we don't do anything
Expand Down Expand Up @@ -320,10 +326,11 @@ func (a *sourcePluginAdapter) Stop(ctx context.Context, _ cpluginv1.SourceStopRe
}

func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTeardownRequest) (cpluginv1.SourceTeardownResponse, error) {
var waitErr error // store waitErr
err := a.state.DoWithLock(ctx, internal.DoWithLockOptions{
ExpectedStates: nil, // Teardown can be called from any state
StateBefore: internal.StateTearingDown,
StateAfter: internal.StateTornDown, // TODO set state regardless of error
StateAfter: internal.StateTornDown,
}, func(state internal.ConnectorState) error {
// cancel open and read context, in case Stop was not called (can happen in
// case the stop was triggered by an error)
Expand All @@ -337,7 +344,6 @@ func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTe
a.readCancel()
}

var waitErr error
if a.t != nil {
waitErr = a.waitForRun(ctx, teardownTimeout) // wait for Run to stop running
if waitErr != nil {
Expand All @@ -348,14 +354,12 @@ func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTe
}
}

err := a.impl.Teardown(ctx)
if err != nil {
return err
}

return waitErr
return a.impl.Teardown(ctx)
})

if err == nil {
err = waitErr
}
return cpluginv1.SourceTeardownResponse{}, err
}

Expand Down

0 comments on commit b9c09bf

Please sign in to comment.