Skip to content

Commit

Permalink
Merge pull request #285 from cloudwego/release/v0.5.0
Browse files Browse the repository at this point in the history
chore: release v0.5.0
  • Loading branch information
joway authored Sep 26, 2023
2 parents e19afcc + 5719b53 commit 0faba6e
Show file tree
Hide file tree
Showing 20 changed files with 468 additions and 179 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: "1.20"

- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -66,7 +66,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -80,4 +80,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
26 changes: 13 additions & 13 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.15, "1.20" ]
go: [ 1.15, "1.21" ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -15,12 +15,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
Expand All @@ -33,12 +33,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: "1.20"
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Build Test
run: go vet -v ./...
style-test:
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ For more information, please refer to [Document](#document).
- Linux, macOS (operating system)

* **Future**
- [multisyscall][multisyscall] supports batch system calls
- [io_uring][io_uring]
- Shared Memory IPC
- Serial scheduling I/O, suitable for pure computing
- TLS
- UDP

Expand Down Expand Up @@ -102,5 +100,4 @@ More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmar
[LinkBuffer]: nocopy_linkbuffer.go
[gopool]: https://github.com/bytedance/gopkg/tree/develop/util/gopool
[mcache]: https://github.com/bytedance/gopkg/tree/develop/lang/mcache
[multisyscall]: https://github.com/cloudwego/multisyscall
[io_uring]: https://github.com/axboe/liburing
3 changes: 0 additions & 3 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
- 支持 Linux,macOS(操作系统)

* **即将开源**
- [multisyscall][multisyscall] 支持批量系统调用
- [io_uring][io_uring]
- Shared Memory IPC
- 串行调度 I/O,适用于纯计算
- 支持 TLS
- 支持 UDP

Expand Down Expand Up @@ -95,5 +93,4 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
[LinkBuffer]: nocopy_linkbuffer.go
[gopool]: https://github.com/bytedance/gopkg/tree/develop/util/gopool
[mcache]: https://github.com/bytedance/gopkg/tree/develop/lang/mcache
[multisyscall]: https://github.com/cloudwego/multisyscall
[io_uring]: https://github.com/axboe/liburing
68 changes: 38 additions & 30 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type connection struct {
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan struct{}
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
Expand Down Expand Up @@ -319,9 +319,9 @@ var barrierPool = sync.Pool{
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan struct{}, 1)
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = block1k/2, pagesize
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)

Expand Down Expand Up @@ -357,19 +357,12 @@ func (c *connection) initNetFD(conn Conn) {
}

func (c *connection) initFDOperator() {
var op *FDOperator
if c.pd != nil && c.pd.operator != nil {
// reuse operator created at connect step
op = c.pd.operator
} else {
poll := pollmanager.Pick()
op = poll.Alloc()
}
poll := pollmanager.Pick()
op := poll.Alloc()
op.FD = c.fd
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck

c.operator = op
}

Expand All @@ -385,9 +378,9 @@ func (c *connection) initFinalizer() {
})
}

func (c *connection) triggerRead() {
func (c *connection) triggerRead(err error) {
select {
case c.readTrigger <- struct{}{}:
case c.readTrigger <- err:
default:
}
}
Expand All @@ -411,10 +404,17 @@ func (c *connection) waitRead(n int) (err error) {
}
// wait full n
for c.inputBuffer.Len() < n {
if !c.IsActive() {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
<-c.readTrigger
}
return nil
}
Expand All @@ -429,24 +429,32 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
}

for c.inputBuffer.Len() < n {
if !c.IsActive() {
// cannot return directly, stop timer before !
switch c.status(closing) {
case poller:
// cannot return directly, stop timer first!
err = Exception(ErrEOF, "wait read")
goto RET
case user:
// cannot return directly, stop timer first!
err = Exception(ErrConnClosed, "wait read")
break
}

select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
goto RET
default:
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
}
continue
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case <-c.readTrigger:
continue
}
}

RET:
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
Expand Down
10 changes: 9 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"sync/atomic"
)

type who int32
type who = int32

const (
none who = iota
Expand Down Expand Up @@ -65,6 +65,14 @@ func (l *locker) isCloseBy(w who) (yes bool) {
return atomic.LoadInt32(&l.keychain[closing]) == int32(w)
}

func (l *locker) status(k key) int32 {
return atomic.LoadInt32(&l.keychain[k])
}

func (l *locker) force(k key, v int32) {
atomic.StoreInt32(&l.keychain[k], v)
}

func (l *locker) lock(k key) (success bool) {
return atomic.CompareAndSwapInt32(&l.keychain[k], 0, 1)
}
Expand Down
53 changes: 37 additions & 16 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,45 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
}
// add new task
var task = func() {
panicked := true
defer func() {
// cannot use recover() here, since we don't want to break the panic stack
if panicked {
c.unlock(processing)
if c.IsActive() {
c.Close()
} else {
c.closeCallback(false, false)
}
}
}()
START:
// `process` must be executed at least once if `isProcessable` in order to cover the `send & close by peer` case.
// Then the loop processing must ensure that the connection `IsActive`.
if isProcessable(c) {
process(c)
}
for c.IsActive() && isProcessable(c) {
// `process` must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
break
}
process(c)
}
// Handling callback if connection has been closed.
if !c.IsActive() {
c.closeCallback(false)
if closedBy != none {
// if closed by user when processing, it "may" needs detach
needDetach := closedBy == user
// Here is a conor case that operator will be detached twice:
// If server closed the connection(client OnHup will detach op first and closeBy=poller),
// and then client's OnRequest function also closed the connection(closeBy=user).
// But operator already prevent that detach twice will not cause any problem
c.closeCallback(false, needDetach)
panicked = false
return
}
c.unlock(processing)
Expand All @@ -197,6 +224,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
goto START
}
// task exits
panicked = false
return
}

Expand All @@ -207,14 +235,14 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
// closeCallback .
// It can be confirmed that closeCallback and onRequest will not be executed concurrently.
// If onRequest is still running, it will trigger closeCallback on exit.
func (c *connection) closeCallback(needLock bool) (err error) {
func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.isCloseBy(user) && c.operator.poll != nil {
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered.
// PollDetach only happen when user call conn.Close() or poller detect error
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback[%v,%v] detach operator failed: %v", needLock, needDetach, err)
}
}
var latest = c.closeCallbacks.Load()
Expand All @@ -229,14 +257,7 @@ func (c *connection) closeCallback(needLock bool) (err error) {

// register only use for connection register into poll.
func (c *connection) register() (err error) {
if c.operator.isUnused() {
// operator is not registered
err = c.operator.Control(PollReadable)
} else {
// operator is already registered
// change event to wait read new data
err = c.operator.Control(PollModReadable)
}
err = c.operator.Control(PollReadable)
if err != nil {
logger.Printf("NETPOLL: connection register failed: %v", err)
c.Close()
Expand Down
Loading

0 comments on commit 0faba6e

Please sign in to comment.