Skip to content

Commit

Permalink
Merge pull request #260 from cloudwego/release/v0.4.0
Browse files Browse the repository at this point in the history
chore: release v0.4.0
  • Loading branch information
joway authored Jun 13, 2023
2 parents 88cace1 + 4b09897 commit a8756d2
Show file tree
Hide file tree
Showing 25 changed files with 562 additions and 738 deletions.
69 changes: 16 additions & 53 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ type connection struct {
bookSize int // The size of data that can be read at once.
}

var _ Connection = &connection{}
var _ Reader = &connection{}
var _ Writer = &connection{}
var (
_ Connection = &connection{}
_ Reader = &connection{}
_ Writer = &connection{}
)

// Reader implements Connection.
func (c *connection) Reader() Reader {
Expand Down Expand Up @@ -168,7 +170,7 @@ func (c *connection) Until(delim byte) (line []byte, err error) {
l = c.inputBuffer.Len()
i := c.inputBuffer.indexByte(delim, n)
if i < 0 {
n = l //skip all exists bytes
n = l // skip all exists bytes
continue
}
return c.Next(i + 1)
Expand Down Expand Up @@ -297,6 +299,12 @@ func (c *connection) Close() error {
return c.onClose()
}

// Detach detaches the connection from poller but doesn't close it.
func (c *connection) Detach() error {
c.detaching = true
return c.onClose()
}

// ------------------------------------------ private ------------------------------------------

var barrierPool = sync.Pool{
Expand Down Expand Up @@ -368,8 +376,6 @@ func (c *connection) initFDOperator() {
func (c *connection) initFinalizer() {
c.AddCloseCallback(func(connection Connection) (err error) {
c.stop(flushing)
// stop the finalizing state to prevent conn.fill function to be performed
c.stop(finalizing)
c.operator.Free()
if err = c.netFD.Close(); err != nil {
logger.Printf("NETPOLL: netFD close failed: %v", err)
Expand Down Expand Up @@ -405,15 +411,10 @@ func (c *connection) waitRead(n int) (err error) {
}
// wait full n
for c.inputBuffer.Len() < n {
if c.IsActive() {
<-c.readTrigger
continue
}
// confirm that fd is still valid.
if atomic.LoadUint32(&c.netFD.closed) == 0 {
return c.fill(n)
if !c.IsActive() {
return Exception(ErrConnClosed, "wait read")
}
return Exception(ErrConnClosed, "wait read")
<-c.readTrigger
}
return nil
}
Expand All @@ -430,12 +431,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
for c.inputBuffer.Len() < n {
if !c.IsActive() {
// cannot return directly, stop timer before !
// confirm that fd is still valid.
if atomic.LoadUint32(&c.netFD.closed) == 0 {
err = c.fill(n)
} else {
err = Exception(ErrConnClosed, "wait read")
}
err = Exception(ErrConnClosed, "wait read")
break
}

Expand All @@ -458,39 +454,6 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
return err
}

// fill data after connection is closed.
func (c *connection) fill(need int) (err error) {
if !c.lock(finalizing) {
return ErrConnClosed
}
defer c.unlock(finalizing)

var n int
var bs [][]byte
for {
bs = c.inputs(c.inputBarrier.bs)
TryRead:
n, err = readv(c.fd, bs, c.inputBarrier.ivs)
if err != nil {
if err == syscall.EINTR {
// if err == EINTR, we must reuse bs that has been booked
// otherwise will mess the input buffer
goto TryRead
}
break
}
if n == 0 {
err = Exception(ErrEOF, "")
break
}
c.inputAck(n)
}
if c.inputBuffer.Len() >= need {
return nil
}
return err
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
Expand Down
1 change: 0 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const (
closing key = iota
processing
flushing
finalizing
// total must be at the bottom.
total
)
Expand Down
2 changes: 1 addition & 1 deletion connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error {
return nil
}

// OnPrepare supports close connection, but not read/write data.
// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {
if opts != nil {
Expand Down
61 changes: 60 additions & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -395,7 +397,7 @@ func TestConnectionUntil(t *testing.T) {

buf, err := rconn.Reader().Until('\n')
Equal(t, len(buf), 100)
MustTrue(t, errors.Is(err, ErrEOF))
Assert(t, errors.Is(err, ErrConnClosed), err)
}

func TestBookSizeLargerThanMaxSize(t *testing.T) {
Expand Down Expand Up @@ -432,3 +434,60 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) {
wg.Wait()
rconn.Close()
}

func TestConnDetach(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
if conn == nil {
continue
}
go func() {
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
}
}()
}
}()

c, err := DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)

err = conn.Detach()
MustNil(t, err)

f := os.NewFile(uintptr(conn.fd), "netpoll-connection")
defer f.Close()

gonetconn, err := net.FileConn(f)
MustNil(t, err)
buf := make([]byte, 1024)
_, err = gonetconn.Write(buf)
MustNil(t, err)
_, err = gonetconn.Read(buf)
MustNil(t, err)

err = gonetconn.Close()
MustNil(t, err)

err = ln.Close()
MustNil(t, err)
}
2 changes: 1 addition & 1 deletion mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (q *ShardQueue) Close() error {
// wait for all tasks finished
for atomic.LoadInt32(&q.state) != closed {
if atomic.LoadInt32(&q.trigger) == 0 {
atomic.StoreInt32(&q.trigger, closed)
atomic.StoreInt32(&q.state, closed)
return nil
}
runtime.Gosched()
Expand Down
42 changes: 42 additions & 0 deletions net_io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

import "syscall"

// return value:
// - n: n == 0 but err == nil, retry syscall
// - err: if not nil, connection should be closed.
func ioread(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
n, err = readv(fd, bs, ivs)
if n == 0 && err == nil { // means EOF
return 0, Exception(ErrEOF, "")
}
if err == syscall.EINTR || err == syscall.EAGAIN {
return 0, nil
}
return n, err
}

// return value:
// - n: n == 0 but err == nil, retry syscall
// - err: if not nil, connection should be closed.
func iosend(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, err error) {
n, err = sendmsg(fd, bs, ivs, zerocopy)
if err == syscall.EAGAIN {
return 0, nil
}
return n, err
}
2 changes: 2 additions & 0 deletions net_netfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type netFD struct {
network string // tcp tcp4 tcp6, udp, udp4, udp6, ip, ip4, ip6, unix, unixgram, unixpacket
localAddr net.Addr
remoteAddr net.Addr
// for detaching conn from poller
detaching bool
}

func newNetFD(fd, family, sotype int, net string) *netFD {
Expand Down
2 changes: 1 addition & 1 deletion net_netfd_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *netFD) Close() (err error) {
if atomic.AddUint32(&c.closed, 1) != 1 {
return nil
}
if c.fd > 0 {
if !c.detaching && c.fd > 2 {
err = syscall.Close(c.fd)
if err != nil {
logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error())
Expand Down
9 changes: 5 additions & 4 deletions netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
// Experience recommends assigning a poller every 20c.
//
// You can only use SetNumLoops before any connection is created. An example usage:
// func init() {
// netpoll.SetNumLoops(...)
// }
//
// func init() {
// netpoll.SetNumLoops(...)
// }
func SetNumLoops(numLoops int) error {
return setNumLoops(numLoops)
}

// LoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when NumLoops is set.
func SetLoadBalance(lb LoadBalance) error {
Expand Down
Loading

0 comments on commit a8756d2

Please sign in to comment.