-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to handle context cancellations for TCP protocol #1389
Conversation
…ng ops on that connection will be properly released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I will do a small testing on my side and then we can merge.
conn.go
Outdated
if c.closed { | ||
err := errors.New("attempted sending on closed connection") | ||
c.debugf("[send data] err: %v", err) | ||
return err | ||
} | ||
|
||
if c.buffer == nil { | ||
err := errors.New("attempted sending on nil buffer") | ||
c.debugf("[send data] err: %v", err) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it something you found during debugging or presumably want to safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is outdated code, please check the latest changes
https://github.com/ClickHouse/clickhouse-go/pull/1389/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, safety is not presumed, it's necessary here.
previously, in conn_process.go access to c.close and c.buffer was single-threaded.
now i'm reading data in background goroutine and in foreground thread i'm waiting on select:
// do reads in background
errCh := make(chan error, 1)
doneCh := make(chan bool, 1)
go func() {
err := c.processImpl(ctx, on) // this accesses c.reader internally (reads data)
if err != nil {
errCh <- err
return
}
doneCh <- true
}()
// select on context or read channel (errors)
select {
case <-ctx.Done():
c.cancel() // calls c.close(), it accesses c.reader internally (sets c.reader to nil)
return ctx.Err()
case err := <-errCh:
return err
case <-doneCh:
return nil
}
-
Closing connections must be done very carefully. Currently conn.Close() is broken because multiple calls to conn.Close() have issues deadlocking the thread it was called from
-
Also multiple calls to c.close() from multiple threads is not thread safe, doing so will result in data races.
So, this needed to be fixed from this:
func (c *connect) close() error {
if c.closed {
return nil
}
c.closed = true
<...>
To a thread-safe version of it
func (c *connect) close() error {
c.closeMutex.Lock()
if c.closed {
c.closeMutex.Unlock()
return nil
}
c.closed = true
c.closeMutex.Unlock()
<...>
This guarantees that under no circumstances c.close() can be called twice no matter what.
-
Besides this we need to guarantee threadsafe access to c.reader from c.processImpl(), c.cancel() and c.close(), hence I've added c.readerMutex()
-
One more thing: the order of operations during closing a connection:
Old version:
func (c *connect) close() error {
c.closeMutex.Lock()
if c.closed {
c.closeMutex.Unlock()
return nil
}
c.closed = true
c.closeMutex.Unlock()
c.buffer = nil
c.readerMutex.Lock()
c.reader = nil
c.readerMutex.Unlock()
if err := c.conn.Close(); err != nil {
return err
}
return nil
}
As you can see here, we're changing c.buffer and c.reader that might be currently reading/sending data, before we close the network connection. Connection must be closed first, in order to stop any i/o operations on c.buffer and c.reader (and related contexts), then it's safe to deallocale c.buffer and c.reader.
func (c *connect) close() error {
c.closeMutex.Lock()
if c.closed {
c.closeMutex.Unlock()
return nil
}
c.closed = true
c.closeMutex.Unlock()
if err := c.conn.Close(); err != nil {
return err
}
c.buffer = nil
c.readerMutex.Lock()
c.reader = nil
c.readerMutex.Unlock()
return nil
}
conn.go
Outdated
@@ -131,6 +139,8 @@ type connect struct { | |||
readTimeout time.Duration | |||
blockBufferSize uint8 | |||
maxCompressionBuffer int | |||
mutex sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think it makes sense to name it explicit?
mutex sync.Mutex | |
readerMutex sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
conn.go
Outdated
if c.isClosed() { | ||
err := errors.New("attempted sending on closed connection") | ||
c.debugf("[send data] err: %v", err) | ||
return err | ||
} | ||
|
||
if c.buffer == nil { | ||
err := errors.New("attempted sending on nil buffer") | ||
c.debugf("[send data] err: %v", err) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it something you found when while debugging or only added for safety?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is outdated code, please check the latest changes
https://github.com/ClickHouse/clickhouse-go/pull/1389/files
it was all found during debugging, yes.
note: if c.buffer == nil {
is already removed in latest commit, it was incorrect assumption on my side, trying to solve synchronisation issues.
if c.isClosed() {
check is necessary. We can't perform read/send operations on closed connections
I conducted basic tests. This will be merged with next minor release in two weeks. |
….29.0 (#35397) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [github.com/ClickHouse/clickhouse-go/v2](https://redirect.github.com/ClickHouse/clickhouse-go) | `v2.28.3` -> `v2.29.0` | [![age](https://developer.mend.io/api/mc/badges/age/go/github.com%2fClickHouse%2fclickhouse-go%2fv2/v2.29.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/github.com%2fClickHouse%2fclickhouse-go%2fv2/v2.29.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/github.com%2fClickHouse%2fclickhouse-go%2fv2/v2.28.3/v2.29.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/github.com%2fClickHouse%2fclickhouse-go%2fv2/v2.28.3/v2.29.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- > [!WARNING] > Some dependencies could not be looked up. Check the Dependency Dashboard for more information. --- ### Release Notes <details> <summary>ClickHouse/clickhouse-go (github.com/ClickHouse/clickhouse-go/v2)</summary> ### [`v2.29.0`](https://redirect.github.com/ClickHouse/clickhouse-go/blob/HEAD/CHANGELOG.md#v2290-2024-09-24----Release-notes-generated-using-configuration-in-githubreleaseyml-at-main---) [Compare Source](https://redirect.github.com/ClickHouse/clickhouse-go/compare/v2.28.3...v2.29.0) #### What's Changed ##### Enhancements 🎉 - Add ability to handle context cancellations for TCP protocol by [@​tinybit](https://redirect.github.com/tinybit) in [https://github.com/ClickHouse/clickhouse-go/pull/1389](https://redirect.github.com/ClickHouse/clickhouse-go/pull/1389) ##### Other Changes 🛠 - Add Examples for batch.Column(n).AppendRow in columnar_insert.go by [@​achmad-dev](https://redirect.github.com/achmad-dev) in [https://github.com/ClickHouse/clickhouse-go/pull/1410](https://redirect.github.com/ClickHouse/clickhouse-go/pull/1410) #### New Contributors - [@​achmad-dev](https://redirect.github.com/achmad-dev) made their first contribution in [https://github.com/ClickHouse/clickhouse-go/pull/1410](https://redirect.github.com/ClickHouse/clickhouse-go/pull/1410) - [@​tinybit](https://redirect.github.com/tinybit) made their first contribution in [https://github.com/ClickHouse/clickhouse-go/pull/1389](https://redirect.github.com/ClickHouse/clickhouse-go/pull/1389) **Full Changelog**: ClickHouse/clickhouse-go@v2.28.3...v2.29.0 </details> --- ### Configuration 📅 **Schedule**: Branch creation - "on tuesday" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/open-telemetry/opentelemetry-collector-contrib). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzOC44MC4wIiwidXBkYXRlZEluVmVyIjoiMzguODAuMCIsInRhcmdldEJyYW5jaCI6Im1haW4iLCJsYWJlbHMiOlsiZGVwZW5kZW5jaWVzIiwicmVub3ZhdGVib3QiXX0=--> --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: opentelemetrybot <[email protected]> Co-authored-by: Yang Song <[email protected]>
Summary
Issue:
Checklist
Delete items not relevant to your PR: