-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Failover Connector PR2 - core failover functionality #29557
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of my feedback is surface level style nits as I try to understand the implementation. Some of my questions may have obvious answers but I'm having a very difficult time reasoning about the code because there are perhaps a few too many layers of abstraction for me to wrap my head around immediately.
if err == nil { | ||
consumers = append(consumers, newConsumer) | ||
} else { | ||
return errConsumer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: prefer to handle error case and return immediately rather than deeper nesting
if err == nil { | |
consumers = append(consumers, newConsumer) | |
} else { | |
return errConsumer | |
} | |
if err != nil { | |
return errConsumer | |
} | |
consumers = append(consumers, newConsumer) |
|
||
func (f *failoverRouter[C]) registerConsumers() error { | ||
consumers := make([]C, 0) | ||
for _, pipeline := range f.cfg.PipelinePriority { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, since we expect potentially multiple pipelines per level
for _, pipeline := range f.cfg.PipelinePriority { | |
for _, pipelines := range f.cfg.PipelinePriority { |
if f.pS.handleErrorRetryCheck() { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
f.cancelRetry.invokeCancel() | ||
f.cancelRetry.updateCancelFunc(cancel) | ||
f.enableRetry(ctx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
if f.pS.handleErrorRetryCheck() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
f.cancelRetry.invokeCancel() | |
f.cancelRetry.updateCancelFunc(cancel) | |
f.enableRetry(ctx) | |
} | |
if !f.pS.handleErrorRetryCheck() { | |
return | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
f.cancelRetry.invokeCancel() | |
f.cancelRetry.updateCancelFunc(cancel) | |
f.enableRetry(ctx) |
Hey @djaglowski, let me address all your comments and maybe it would be helpful for me to add some more verbose comments in the source |
Hey @djaglowski, I made some updates and added additional comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akats7, thanks for iterating on this.
The primary consume path seems a lot more clear to me, but I'm still struggling to reconcile the way we are managing indexes.
connector/failoverconnector/internal/state/pipeline_selector.go
Outdated
Show resolved
Hide resolved
connector/failoverconnector/internal/state/pipeline_selector.go
Outdated
Show resolved
Hide resolved
connector/failoverconnector/internal/state/pipeline_selector.go
Outdated
Show resolved
Hide resolved
connector/failoverconnector/internal/state/pipeline_selector.go
Outdated
Show resolved
Hide resolved
connector/failoverconnector/internal/state/pipeline_selector.go
Outdated
Show resolved
Hide resolved
@djaglowski Can you remove the connector/routing tag, looks like I accidentally modified a file in a commit |
This is the 2nd PR for the failover connector that implements the core failover functionality. It is currently in place for Traces and once solidified will be repeated for metrics and logs
Link to tracking Issue: #20766
Note: Will add traces tests today but pushing up to begin review
cc: @djaglowski @fatsheep9146