diff --git a/internal/connectorstate.go b/internal/connectorstate.go index 2112862..72b8baa 100644 --- a/internal/connectorstate.go +++ b/internal/connectorstate.go @@ -32,7 +32,9 @@ const ( StateConfigured StateStarting StateStarted + StateInitiatingRun StateRunning + StateInitiatingStop StateStopping StateStopped StateTearingDown @@ -100,14 +102,6 @@ func (w *ConnectorStateWatcher) DoWithLock( return nil } -func (w *ConnectorStateWatcher) CheckErrorAndSwap(err error, newState ConnectorState) bool { - if err != nil { - return w.Set(StateErrored) - } else { - return w.Set(newState) - } -} - func (w *ConnectorStateWatcher) Set(newState ConnectorState) bool { lockedWatcher := (*csync.ValueWatcher[ConnectorState])(w).Lock() defer lockedWatcher.Unlock() diff --git a/internal/connectorstate_string.go b/internal/connectorstate_string.go index 42f367f..d5f0817 100644 --- a/internal/connectorstate_string.go +++ b/internal/connectorstate_string.go @@ -13,26 +13,28 @@ func _() { _ = x[StateConfigured-2] _ = x[StateStarting-3] _ = x[StateStarted-4] - _ = x[StateRunning-5] - _ = x[StateStopping-6] - _ = x[StateStopped-7] - _ = x[StateTearingDown-8] - _ = x[StateTornDown-9] + _ = x[StateInitiatingRun-5] + _ = x[StateRunning-6] + _ = x[StateInitiatingStop-7] + _ = x[StateStopping-8] + _ = x[StateStopped-9] + _ = x[StateTearingDown-10] + _ = x[StateTornDown-11] _ = x[StateErrored-500] } const ( - _ConnectorState_name_0 = "InitialConfiguringConfiguredStartingStartedRunningStoppingStoppedTearingDownTornDown" + _ConnectorState_name_0 = "InitialConfiguringConfiguredStartingStartedInitiatingRunRunningInitiatingStopStoppingStoppedTearingDownTornDown" _ConnectorState_name_1 = "Errored" ) var ( - _ConnectorState_index_0 = [...]uint8{0, 7, 18, 28, 36, 43, 50, 58, 65, 76, 84} + _ConnectorState_index_0 = [...]uint8{0, 7, 18, 28, 36, 43, 56, 63, 77, 85, 92, 103, 111} ) func (i ConnectorState) String() string { switch { - case 0 <= i && i <= 9: + case 0 <= i && i <= 11: return _ConnectorState_name_0[_ConnectorState_index_0[i]:_ConnectorState_index_0[i+1]] case i == 500: return _ConnectorState_name_1 diff --git a/source.go b/source.go index 6567858..905ff7c 100644 --- a/source.go +++ b/source.go @@ -194,7 +194,7 @@ func (a *sourcePluginAdapter) Start(ctx context.Context, req cpluginv1.SourceSta func (a *sourcePluginAdapter) Run(ctx context.Context, stream cpluginv1.SourceRunStream) (err error) { err = a.state.DoWithLock(ctx, internal.DoWithLockOptions{ ExpectedStates: []internal.ConnectorState{internal.StateStarted}, - StateBefore: internal.StateRunning, // TODO create another state + StateBefore: internal.StateInitiatingRun, StateAfter: internal.StateRunning, WaitForExpectedState: false, }, func(_ internal.ConnectorState) error { @@ -218,7 +218,13 @@ func (a *sourcePluginAdapter) Run(ctx context.Context, stream cpluginv1.SourceRu return err } - defer func() { a.state.CheckErrorAndSwap(err, internal.StateStopped) }() + defer func() { + if err != nil { + a.state.Set(internal.StateErrored) + } else { + a.state.Set(internal.StateStopped) + } + }() <-a.t.Dying() // stop as soon as it's dying return a.t.Err() @@ -287,9 +293,9 @@ func (a *sourcePluginAdapter) Stop(ctx context.Context, _ cpluginv1.SourceStopRe ExpectedStates: []internal.ConnectorState{ internal.StateRunning, internal.StateStopping, internal.StateTornDown, internal.StateErrored, }, - StateBefore: internal.StateStopping, - StateAfter: internal.StateStopping, // TODO create another state? - WaitForExpectedState: true, // wait for one of the expected states + StateBefore: internal.StateInitiatingStop, + StateAfter: internal.StateStopping, + WaitForExpectedState: true, // wait for one of the expected states }, func(state internal.ConnectorState) error { if state != internal.StateRunning { // stop already executed or we errored out, in any case we don't do anything @@ -320,10 +326,11 @@ func (a *sourcePluginAdapter) Stop(ctx context.Context, _ cpluginv1.SourceStopRe } func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTeardownRequest) (cpluginv1.SourceTeardownResponse, error) { + var waitErr error // store waitErr err := a.state.DoWithLock(ctx, internal.DoWithLockOptions{ ExpectedStates: nil, // Teardown can be called from any state StateBefore: internal.StateTearingDown, - StateAfter: internal.StateTornDown, // TODO set state regardless of error + StateAfter: internal.StateTornDown, }, func(state internal.ConnectorState) error { // cancel open and read context, in case Stop was not called (can happen in // case the stop was triggered by an error) @@ -337,7 +344,6 @@ func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTe a.readCancel() } - var waitErr error if a.t != nil { waitErr = a.waitForRun(ctx, teardownTimeout) // wait for Run to stop running if waitErr != nil { @@ -348,14 +354,12 @@ func (a *sourcePluginAdapter) Teardown(ctx context.Context, _ cpluginv1.SourceTe } } - err := a.impl.Teardown(ctx) - if err != nil { - return err - } - - return waitErr + return a.impl.Teardown(ctx) }) + if err == nil { + err = waitErr + } return cpluginv1.SourceTeardownResponse{}, err }