Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fn: add goroutine manager #9141

Merged
merged 1 commit into from
Oct 5, 2024
Merged

Conversation

starius
Copy link
Collaborator

@starius starius commented Sep 27, 2024

Change Description

The package provides type GoroutineManager which is used to launch goroutines until context expires or the manager is stopped. Stop method blocks until all started goroutines stop.

Original code by Andras https://go.dev/play/p/HhRpE-K2lA0

Adjustments and tests by Boris.

I also added another unrelated fn fix to this PR: fn: generalize type of t in UnwrapOrFail.

Steps to Test

CI runs unit test. Also I used GoroutineManager in #9140

Pull Request Checklist

Testing

  • Your PR passes all CI checks.
  • Tests covering the positive and negative (error paths) are included.
  • Bug fixes contain tests triggering the bug to prevent regressions.

Code Style and Documentation

Copy link
Contributor

coderabbitai bot commented Sep 27, 2024

Important

Review skipped

Auto reviews are limited to specific labels.

🏷️ Labels to auto review (1)
  • llm-review

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

fn/goroutine_manager.go Outdated Show resolved Hide resolved
fn/option.go Outdated Show resolved Hide resolved
fn/goroutine_manager_test.go Show resolved Hide resolved
fn/goroutine_manager_test.go Outdated Show resolved Hide resolved
fn/goroutine_manager_test.go Outdated Show resolved Hide resolved
fn/goroutine_manager_test.go Show resolved Hide resolved
Copy link
Collaborator

@ziggie1984 ziggie1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

fn/goroutine_manager.go Outdated Show resolved Hide resolved
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
g.mu.Lock()
defer g.mu.Unlock()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit add a comment, that calling Add when already a Wait() is acitve will cause the golang env to panic ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also added a similar comment in GoroutineManager.Stop method.

)

// TestGoroutineManager tests that GoroutineManager starts goroutines, until ctx
// expires fails to start after it expires and waits for already started
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: // TestGoroutineManager tests that the GoroutineManager starts goroutines until ctx
// expires. It also makes sure it fails to start new goroutines after the context expired and the GoroutineManager is in the process of waiting for already started goroutines in the Stop method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

fn/goroutine_manager_test.go Show resolved Hide resolved

stopChan := make(chan struct{})

time.AfterFunc(1*time.Millisecond, func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the panic happens in the golang runtime if the counter is actually 0 and we all Add(), or doesn't it matter but as soon as an Add() is called after the Wait was called the panic happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://pkg.go.dev/sync#WaitGroup.Add

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time.

So if the counter is 0, Add(1) and Wait() must not be called in parallel.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the code, and it even panics if you call Add() while Wait is already waiting for all goroutine to finish. Then Wait() will panic, in your above example Add() will panic if the counter is zero and we already have waiters registered.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it even panics if you call Add() while Wait is already waiting for all goroutine to finish

Even with a non-zero counter?

If the counter is 0, Wait() should unblock immediately, not wait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes true, what you are referring to is basically that the wait() method returned because the Wait() method compares if the state was changed before returning if that was the case it panics.

@starius starius marked this pull request as ready for review September 28, 2024 04:12
@starius starius force-pushed the goroutines2 branch 2 times, most recently from 8ff1c2a to a47a35e Compare September 30, 2024 16:41
@starius starius requested a review from ziggie1984 September 30, 2024 19:34
Copy link
Collaborator

@ziggie1984 ziggie1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc to disincentivize the use of this function otherwise lgtm

cancel func()
}

// NewGoroutineManager constructs and returns a new instance of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the GoroutineManager I mean it's good that we are introducing it, but I think we should disincentivize its use, because there is something wrong with the design if you need to use it, maybe we should mark this in the comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added the following note:

// NOTE: while it is safe to use GoroutineManager, it is recommended to change
// the design of the code using it to avoid launching goroutines except at start
// time. Ideally, goroutine launches and Stop() call should not compete.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I have to chime in here. Why are we adding an abstraction that we are explicitly discouraging? Seems like this will be counterproductive in the long run.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm ok, so you are actually fine with this way to use the waitGroup ? I thought it was a bandaid to the current situation to not rewrite the whole htlc switch/payment code, but using the waitgroup in that way is not good design imo, because the waitgroup is inherently thread safe and does not need a mutex generally when it is made sure Adds are called before Wait().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that building infrastructure to support bad code design is just weird. We should fix the design or patch over it. We don't want to make it easier to write bad code in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common scenario is like this:

Some type 
  - New() returns new instance of the type
  - Some method starts an auxiliary goroutine which may keep working after the method completes
   (though this is not needed to have the problem with goroutines)
  - Method Stop() instructs all such background running workers to stop and waits for them to
   actually stop to avoid goroutine leak

(It is not only htlc swich, also found in ChannelRouter.SendPaymentAsync, maybe in other cases too.)

Such a scenario is not thread-safe to use a WaitGroup (without a mutex) to track goroutines.

But is this always a bad scenario which we should discourage in favor of a design using fixed number of goroutines started in New() and stopped in Stop()? The later scenario is thread safe for an unprotected WaitGroup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the issue is with fixed vs variable number of goroutines. This issue arises from having multiple threads responsible for managing the WaitGroup. By sending requests into some object's main event loop (which is responsible for managing the waitgroup) and having it launch the new goroutine would handily solve the issue. However right now we cannot make the assumption that the methods of an object are called from the same thread, which is what causes the problem.

This brings me to a new theorem of mine: WaitGroups that manage a runtime determined number of threads should only exist in the stack memory of a goroutine, not in a field on some object. If the number of goroutines is statically known at compile time, then it may be on the field of the object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the theorem! It is a good rule resulting in cleaner code. Also I think it is possible to enforce it at compile time (lint rule?).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to write lint rules right now but if that's possible that sounds great!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the note.


// Done returns a channel which is closed when either the context passed to
// NewGoroutineManager expires or when Stop is called.
func (g *GoroutineManager) Done() <-chan struct{} {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Make sure new goroutines do not start after Stop.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)

// When Stop() is called, m.Done() is closed. Test that it is closed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Test that the internal context is closed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


stopChan := make(chan struct{})

time.AfterFunc(1*time.Millisecond, func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes true, what you are referring to is basically that the wait() method returned because the Wait() method compares if the state was changed before returning if that was the case it panics.

The package provides type GoroutineManager which is used to launch goroutines
until context expires or the manager is stopped. Stop method blocks until all
started goroutines stop.

Original code by Andras https://go.dev/play/p/HhRpE-K2lA0

Adjustments and tests by Boris.
@Roasbeef Roasbeef merged commit e2c97ed into lightningnetwork:master Oct 5, 2024
19 of 21 checks passed
@starius starius deleted the goroutines2 branch October 7, 2024 15:55
@guggero
Copy link
Collaborator

guggero commented Nov 11, 2024

Looking at this a bit more closely while reviewing #9253.

The defer g.wg.Done() within GoroutineManager.Go isn't protected by the mutex anymore, since it's running in a Goroutine. But Done() internally calls Add(-1), so we still have a potential race condition with Wait().

@ziggie1984
Copy link
Collaborator

Yes I think you are right, if we call Add(-1) while also adding a goroutine concurrently it will panic, wonder if there could be a fix for this @starius

from the waitgroup impl. we would panic:

// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

@starius
Copy link
Collaborator Author

starius commented Nov 14, 2024

@guggero @ziggie1984 Thanks for looking into it!

Documentation of sync.WaitGroup.Wait has the following part:

Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time.

I guess this means, that Add(-1) (= Done()) may happen at any time, even in parallel with Wait(). Intuitively this also makes sense, because it is not a race condition logically: the outcome does not depend on the order of Done() and Wait() calls, in opposite to a situation when Add(1) and Wait() are called in parallel. In the later case, the outcome depends on the order (if Wait() should block or just return immediately).

I also created a stress test testing GoroutineManager trying to catch this race condition, if it exists:

// TestGoroutineManageStopsStress launches many Stop() calls in parallel with a
// task exiting. It attempts to catch a race condition between wg.Done() and
// wg.Wait() calls. According to documentation of wg.Wait() this is acceptable,
// therefore this test passes even with -race.
func TestGoroutineManageStopsStress(t *testing.T) {
        t.Parallel()

        m := NewGoroutineManager(context.Background())

        // jobChan is used to make the task to finish.
        jobChan := make(chan struct{})

        // Start a task and wait inside it until we start calling Stop() method.
        err := m.Go(func(ctx context.Context) {
                <-jobChan
        })
        require.NoError(t, err)

        // Now launch many gorotines calling Stop() method in parallel.
        var wg sync.WaitGroup
        for i := 0; i < 100; i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
                        m.Stop()
                }()
        }

        // Exit the task in parallel with Stop() calls.
        close(jobChan)

        // Wait until all the Stop() calls complete.
        wg.Wait()
}

The test passes under -race.

I think, that the current implementation is correct.

starius added a commit to starius/lnd that referenced this pull request Nov 14, 2024
Make sure there is no race condition between Done() and Wait() methods in the
GoroutineManager implementation.

See lightningnetwork#9141 (comment)
@Crypt-iQ
Copy link
Collaborator

@starius is it allowed to call wg.Add(1) at the same time as wg.Wait() if the wg counter > 0?

@starius
Copy link
Collaborator Author

starius commented Nov 14, 2024

@Crypt-iQ From "or calls with a positive delta that start when the counter is greater than zero" part this should be allowed.

@guggero
Copy link
Collaborator

guggero commented Nov 14, 2024

@starius thanks for the explanation, I missed that part in the doc. It indeed looks like the current implementation is correct then!

@ziggie1984
Copy link
Collaborator

I think there is still a possibility of a race but not sure how to trigger it via tests:

Imagine you call Done() (which leads to a counter of 0) but in the same time a Wait was called (before the goroutine counter was 0) then you cannot be sure that the semaphore will release the waiter because the implementation would panic anyways but if it would not panic not as many semaphore release calls would be triggered.

if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	wg.state.Store(0)
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}

This is part of the Add implementation

@starius
Copy link
Collaborator Author

starius commented Nov 14, 2024

@ziggie1984 If Done() and Wait() parallel calls result in a panic, this is a bug in Go, because this is allowed according to the documentation. If it works as stated in the docs, then the eventual outcome is Wait() returning soon (either immediately or waiting for small time until Done() unlocks it).

I'll try to look deeper into the code implementing Add(). It has many branching depending on delta's sign. I suspect one of them results in disabling the panic in case of negative delta. But I need to check this deeper.

@ziggie1984
Copy link
Collaborator

@ziggie1984 If Done() and Wait() parallel calls result in a panic, this is a bug in Go, because this is allowed according to the documentation.

the docs says clearly when the counter is greater than zero and I was exactly refering to the case where we call Wait() concurrently when the last goroutine called the Done() method. Because then we run into t he issue that this done method will never be notified. I analysed the waitgroup and I am really convinced that this would trigger the above linked panic. But this panic seems to be more unlikely if even the race detector cannot trigger it.

@starius
Copy link
Collaborator Author

starius commented Nov 14, 2024

@ziggie1984 I think that "when the counter is greater than zero" part applies only to "calls with a positive delta", not to "Calls with a negative delta" part in that sentence.

If counter=0, this means Add(-1) has already been applied. Doesn't this mean, that Add(-1) precedes Wait() in this case?

Because then we run into t he issue that this done method will never be notified.

Can you elaborate on this, please? If Wait() sees counter=0, it doesn't need to be notified, because it return immediately in that case.

While writing the response, I made a small table version of how I understand the rules of using Add():

delta counter = 0 counter > 0
positive before a Wait at any time
negative not allowed at any time

Add(-1) is not allowed when counter=0, because then the counter would become negative.

@starius
Copy link
Collaborator Author

starius commented Nov 14, 2024

@ziggie1984 Also check example of WaitGroup, please.
https://pkg.go.dev/sync#example-WaitGroup

They call wg.Done() from a goroutine and it can run at the same time with Wait(). If this is not allowed, then the example is wrong and it is a bug in Go.

@ziggie1984
Copy link
Collaborator

Ahh ok right I was a bit mislead by this here:

if wg.state.CompareAndSwap(state, state+1) { so now I completely agree Done() can be called without the Lock held, thank you for the infos.

@ProofOfKeags
Copy link
Collaborator

After discussing with @ziggie1984 this morning I think this is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants