diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index b35a04bf..9484b71c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -3,35 +3,41 @@ name: Push and Pull Request Check on: [ push, pull_request ] jobs: - build: - runs-on: ubuntu-latest + compatibility-test: + strategy: + matrix: + go: [ 1.15, 1.19 ] + os: [ X64, ARM64 ] + runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 - + - uses: actions/checkout@v3 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: - go-version: 1.16 - + go-version: ${{ matrix.go }} - uses: actions/cache@v2 with: path: ~/go/pkg/mod key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go- - + - name: Unit Test + run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... + - name: Benchmark + run: go test -bench=. -benchmem -run=none ./... + style-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.16 - name: Check License Header uses: apache/skywalking-eyes/header@v0.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Lint run: | test -z "$(gofmt -s -l .)" go vet -stdmethods=false $(go list ./...) - - - name: Unit Test - run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... - - - name: Benchmark - run: go test -bench=. -benchmem -run=none ./... diff --git a/connection_impl.go b/connection_impl.go index b4e92394..beb11d23 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -349,26 +349,31 @@ func (c *connection) initNetFD(conn Conn) { } func (c *connection) initFDOperator() { - op := allocop() + var op *FDOperator + if c.pd != nil && c.pd.operator != nil { + // reuse operator created at connect step + op = c.pd.operator + } else { + poll := pollmanager.Pick() + op = poll.Alloc() + } op.FD = c.fd op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup op.Inputs, op.InputAck = c.inputs, c.inputAck op.Outputs, op.OutputAck = c.outputs, c.outputAck - // if connection has been registered, must reuse poll here. - if c.pd != nil && c.pd.operator != nil { - op.poll = c.pd.operator.poll - } c.operator = op } func (c *connection) initFinalizer() { - c.AddCloseCallback(func(connection Connection) error { + c.AddCloseCallback(func(connection Connection) (err error) { c.stop(flushing) // stop the finalizing state to prevent conn.fill function to be performed c.stop(finalizing) - freeop(c.operator) - c.netFD.Close() + c.operator.Free() + if err = c.netFD.Close(); err != nil { + logger.Printf("NETPOLL: netFD close failed: %v", err) + } c.closeBuffer() return nil }) diff --git a/connection_onevent.go b/connection_onevent.go index b686cfd6..e8567ba9 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -213,7 +213,9 @@ func (c *connection) closeCallback(needLock bool) (err error) { } // If Close is called during OnPrepare, poll is not registered. if c.isCloseBy(user) && c.operator.poll != nil { - c.operator.Control(PollDetach) + if err = c.operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err) + } } var latest = c.closeCallbacks.Load() if latest == nil { @@ -227,14 +229,16 @@ func (c *connection) closeCallback(needLock bool) (err error) { // register only use for connection register into poll. func (c *connection) register() (err error) { - if c.operator.poll != nil { - err = c.operator.Control(PollModReadable) - } else { - c.operator.poll = pollmanager.Pick() + if c.operator.isUnused() { + // operator is not registered err = c.operator.Control(PollReadable) + } else { + // operator is already registered + // change event to wait read new data + err = c.operator.Control(PollModReadable) } if err != nil { - logger.Println("connection register failed:", err.Error()) + logger.Printf("NETPOLL: connection register failed: %v", err) c.Close() return Exception(ErrConnClosed, err.Error()) } diff --git a/fd_operator.go b/fd_operator.go index 1b59967c..4132fe9c 100644 --- a/fd_operator.go +++ b/fd_operator.go @@ -45,12 +45,17 @@ type FDOperator struct { // private, used by operatorCache next *FDOperator state int32 // CAS: 0(unused) 1(inuse) 2(do-done) + index int32 // index in operatorCache } func (op *FDOperator) Control(event PollEvent) error { return op.poll.Control(op, event) } +func (op *FDOperator) Free() { + op.poll.Free(op) +} + func (op *FDOperator) do() (can bool) { return atomic.CompareAndSwapInt32(&op.state, 1, 2) } diff --git a/fd_operator_cache.go b/fd_operator_cache.go index 6e93fa0d..0e37ddeb 100644 --- a/fd_operator_cache.go +++ b/fd_operator_cache.go @@ -20,69 +20,80 @@ import ( "unsafe" ) -func allocop() *FDOperator { - return opcache.alloc() -} - -func freeop(op *FDOperator) { - opcache.free(op) -} - -func init() { - opcache = &operatorCache{ - // cache: make(map[int][]byte), - cache: make([]*FDOperator, 0, 1024), +func newOperatorCache() *operatorCache { + return &operatorCache{ + cache: make([]*FDOperator, 0, 1024), + freelist: make([]int32, 0, 1024), } - runtime.KeepAlive(opcache) } -var opcache *operatorCache - type operatorCache struct { locked int32 first *FDOperator cache []*FDOperator + // freelist store the freeable operator + // to reduce GC pressure, we only store op index here + freelist []int32 + freelocked int32 } func (c *operatorCache) alloc() *FDOperator { - c.lock() + lock(&c.locked) if c.first == nil { const opSize = unsafe.Sizeof(FDOperator{}) n := block4k / opSize if n == 0 { n = 1 } - // Must be in non-GC memory because can be referenced - // only from epoll/kqueue internals. + index := int32(len(c.cache)) for i := uintptr(0); i < n; i++ { - pd := &FDOperator{} + pd := &FDOperator{index: index} c.cache = append(c.cache, pd) pd.next = c.first c.first = pd + index++ } } op := c.first c.first = op.next - c.unlock() + unlock(&c.locked) return op } -func (c *operatorCache) free(op *FDOperator) { +// freeable mark the operator that could be freed +// only poller could do the real free action +func (c *operatorCache) freeable(op *FDOperator) { + // reset all state op.unused() op.reset() + lock(&c.freelocked) + c.freelist = append(c.freelist, op.index) + unlock(&c.freelocked) +} + +func (c *operatorCache) free() { + lock(&c.freelocked) + defer unlock(&c.freelocked) + if len(c.freelist) == 0 { + return + } - c.lock() - op.next = c.first - c.first = op - c.unlock() + lock(&c.locked) + for _, idx := range c.freelist { + op := c.cache[idx] + op.next = c.first + c.first = op + } + c.freelist = c.freelist[:0] + unlock(&c.locked) } -func (c *operatorCache) lock() { - for !atomic.CompareAndSwapInt32(&c.locked, 0, 1) { +func lock(locked *int32) { + for !atomic.CompareAndSwapInt32(locked, 0, 1) { runtime.Gosched() } } -func (c *operatorCache) unlock() { - atomic.StoreInt32(&c.locked, 0) +func unlock(locked *int32) { + atomic.StoreInt32(locked, 0) } diff --git a/fd_operator_cache_test.go b/fd_operator_cache_test.go index 3e2838c7..a92f15fb 100644 --- a/fd_operator_cache_test.go +++ b/fd_operator_cache_test.go @@ -24,14 +24,16 @@ import ( // go test -v -gcflags=-d=checkptr -run=TestPersistFDOperator func TestPersistFDOperator(t *testing.T) { + opcache := newOperatorCache() // init - size := 1000 + size := 2048 var ops = make([]*FDOperator, size) for i := 0; i < size; i++ { - op := allocop() + op := opcache.alloc() op.FD = i ops[i] = op } + Equal(t, len(opcache.freelist), 0) // gc for i := 0; i < 4; i++ { runtime.GC() @@ -39,16 +41,23 @@ func TestPersistFDOperator(t *testing.T) { // check alloc for i := range ops { Equal(t, ops[i].FD, i) - freeop(ops[i]) + opcache.freeable(ops[i]) + Equal(t, len(opcache.freelist), i+1) } + Equal(t, len(opcache.freelist), size) + opcache.free() + Equal(t, len(opcache.freelist), 0) + Assert(t, len(opcache.cache) >= size) } func BenchmarkPersistFDOperator1(b *testing.B) { b.ReportAllocs() b.ResetTimer() + opcache := newOperatorCache() for i := 0; i < b.N; i++ { - op := allocop() - freeop(op) + op := opcache.alloc() + opcache.freeable(op) + opcache.free() } } @@ -57,10 +66,12 @@ func BenchmarkPersistFDOperator2(b *testing.B) { b.ReportAllocs() b.SetParallelism(128) b.ResetTimer() + opcache := newOperatorCache() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - op := allocop() - freeop(op) + op := opcache.alloc() + opcache.freeable(op) + opcache.free() } }) } diff --git a/net_listener_test.go b/net_listener_test.go index 315bc9ea..9516f8e5 100644 --- a/net_listener_test.go +++ b/net_listener_test.go @@ -30,28 +30,19 @@ func TestListenerDialer(t *testing.T) { addr := ":1234" ln, err := CreateListener(network, addr) MustNil(t, err) - defer time.Sleep(10 * time.Millisecond) defer ln.Close() - - stop := make(chan int) trigger := make(chan int) - defer close(stop) - defer close(trigger) msg := []byte("0123456789") go func() { for { - select { - case <-stop: - err := ln.Close() - MustNil(t, err) - return - default: - } conn, err := ln.Accept() if conn == nil && err == nil { continue } + if err != nil { + return + } go func(conn net.Conn) { <-trigger buf := make([]byte, 10) diff --git a/net_netfd.go b/net_netfd.go index 0aa09e93..cea01a77 100644 --- a/net_netfd.go +++ b/net_netfd.go @@ -107,7 +107,7 @@ func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) { return nil } -func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) { +func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) { // Do not need to call c.writing here, // because c is not yet accessible to user, // so no concurrent operations are possible. @@ -134,45 +134,6 @@ func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa sysca return nil, os.NewSyscallError("connect", err) } - // TODO: can't support interrupter now. - // Start the "interrupter" goroutine, if this context might be canceled. - // (The background context cannot) - // - // The interrupter goroutine waits for the context to be done and - // interrupts the dial (by altering the c's write deadline, which - // wakes up waitWrite). - if ctx != context.Background() { - // Wait for the interrupter goroutine to exit before returning - // from connect. - done := make(chan struct{}) - interruptRes := make(chan error) - defer func() { - close(done) - if ctxErr := <-interruptRes; ctxErr != nil && ret == nil { - // The interrupter goroutine called SetWriteDeadline, - // but the connect code below had returned from - // waitWrite already and did a successful connect (ret - // == nil). Because we've now poisoned the connection - // by making it unwritable, don't return a successful - // dial. This was issue 16523. - ret = mapErr(ctxErr) - c.Close() // prevent a leak - } - }() - go func() { - select { - case <-ctx.Done(): - // Force the runtime's poller to immediately give up - // waiting for writability, unblocking waitWrite - // below. - c.SetWriteDeadline(aLongTimeAgo) - interruptRes <- ctx.Err() - case <-done: - interruptRes <- nil - } - }() - } - c.pd = newPollDesc(c.fd) for { // Performing multiple connect system calls on a diff --git a/net_netfd_conn.go b/net_netfd_conn.go index 44913f1c..c2ab43e0 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -62,7 +62,7 @@ func (c *netFD) Close() (err error) { if c.fd > 0 { err = syscall.Close(c.fd) if err != nil { - logger.Printf("netFD[%d] close error: %s", c.fd, err.Error()) + logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error()) } } return err diff --git a/net_polldesc.go b/net_polldesc.go index 3a5ee32d..0b78c653 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -19,12 +19,13 @@ package netpoll import ( "context" - "sync" ) // TODO: recycle *pollDesc func newPollDesc(fd int) *pollDesc { - pd, op := &pollDesc{}, &FDOperator{} + pd := &pollDesc{} + poll := pollmanager.Pick() + op := poll.Alloc() op.FD = fd op.OnWrite = pd.onwrite op.OnHup = pd.onhup @@ -36,44 +37,46 @@ func newPollDesc(fd int) *pollDesc { } type pollDesc struct { - once sync.Once operator *FDOperator - // The write event is OneShot, then mark the writable to skip duplicate calling. writeTrigger chan struct{} closeTrigger chan struct{} } // WaitWrite . -// TODO: implement - poll support timeout hung up. -func (pd *pollDesc) WaitWrite(ctx context.Context) error { - var err error - pd.once.Do(func() { - // add ET|Write|Hup - pd.operator.poll = pollmanager.Pick() - err = pd.operator.Control(PollWritable) +func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) { + defer func() { + // if return err != nil, upper caller function will close the connection if err != nil { - pd.detach() + pd.operator.Free() + } + }() + + if pd.operator.isUnused() { + // add ET|Write|Hup + if err = pd.operator.Control(PollWritable); err != nil { + logger.Printf("NETPOLL: pollDesc register operator failed: %v", err) + return err } - }) - if err != nil { - return err } select { + case <-pd.closeTrigger: + // no need to detach, since poller has done it in OnHup. + return Exception(ErrConnClosed, "by peer") + case <-pd.writeTrigger: + err = nil case <-ctx.Done(): + // deregister from poller, upper caller function will close fd pd.detach() - return mapErr(ctx.Err()) + err = mapErr(ctx.Err()) + } + // double check close trigger + select { case <-pd.closeTrigger: return Exception(ErrConnClosed, "by peer") - case <-pd.writeTrigger: - // if writable, check hup by select - select { - case <-pd.closeTrigger: - return Exception(ErrConnClosed, "by peer") - default: - return nil - } + default: + return err } } @@ -87,11 +90,16 @@ func (pd *pollDesc) onwrite(p Poll) error { } func (pd *pollDesc) onhup(p Poll) error { - close(pd.closeTrigger) + select { + case <-pd.closeTrigger: + default: + close(pd.closeTrigger) + } return nil } func (pd *pollDesc) detach() { - pd.operator.Control(PollDetach) - pd.operator.unused() + if err := pd.operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: pollDesc detach operator failed: %v", err) + } } diff --git a/netpoll_server.go b/netpoll_server.go index baf5208f..2d6c5709 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -99,7 +99,7 @@ func (s *server) OnRead(p Poll) error { s.onQuit(err) return err } - logger.Println("accept conn failed:", err.Error()) + logger.Println("NETPOLL: accept conn failed:", err.Error()) return err } if conn == nil { diff --git a/poll.go b/poll.go index 6e72002c..1d5c42fb 100644 --- a/poll.go +++ b/poll.go @@ -34,6 +34,12 @@ type Poll interface { // Control the event of file descriptor and the operations is defined by PollEvent. Control(operator *FDOperator, event PollEvent) error + + // Alloc the operator from cache. + Alloc() (operator *FDOperator) + + // Free the operator from cache. + Free(operator *FDOperator) } // PollEvent defines the operation of poll.Control. diff --git a/poll_default_bsd.go b/poll_default_bsd.go index d2f087dd..62911bbd 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -43,12 +43,14 @@ func openDefaultPoll() *defaultPoll { if err != nil { panic(err) } + l.opcache = newOperatorCache() return l } type defaultPoll struct { fd int trigger uint32 + opcache *operatorCache // operator cache hups []func(p Poll) error } @@ -95,7 +97,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -122,7 +124,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -133,6 +135,7 @@ func (p *defaultPoll) Wait() error { } // hup conns together to avoid blocking the poll. p.detaches() + p.opcache.free() } } @@ -164,11 +167,11 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { case PollReadable, PollModReadable: operator.inuse() evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE - case PollDetach: - evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollWritable: operator.inuse() evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT + case PollDetach: + evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollR2RW: evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE case PollRW2R: @@ -178,6 +181,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return err } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) operator.Control(PollDetach) diff --git a/poll_default_linux.go b/poll_default_linux.go index fdb10e45..290f33f2 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -48,15 +48,17 @@ func openDefaultPoll() *defaultPoll { poll.wop = &FDOperator{FD: int(r0)} poll.Control(poll.wop, PollReadable) + poll.opcache = newOperatorCache() return &poll } type defaultPoll struct { pollArgs - fd int // epoll fd - wop *FDOperator // eventfd, wake epoll_wait - buf []byte // read wfd trigger msg - trigger uint32 // trigger flag + fd int // epoll fd + wop *FDOperator // eventfd, wake epoll_wait + buf []byte // read wfd trigger msg + trigger uint32 // trigger flag + opcache *operatorCache // operator cache // fns for handle events Reset func(size, caps int) Handler func(events []epollevent) (closed bool) @@ -102,6 +104,8 @@ func (p *defaultPoll) Wait() (err error) { if p.Handler(p.events[:n]) { return nil } + // we can make sure that there is no op remaining if Handler finished + p.opcache.free() } } @@ -133,18 +137,20 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if operator.OnRead != nil { // for non-connection operator.OnRead(p) - } else { + } else if operator.Inputs != nil { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } @@ -168,7 +174,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) - } else { + } else if operator.Outputs != nil { // for connection var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { @@ -176,11 +182,13 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } operator.done() @@ -212,28 +220,39 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var evt epollevent *(**FDOperator)(unsafe.Pointer(&evt.data)) = operator switch event { - case PollReadable: + case PollReadable: // server accept a new connection and wait read operator.inuse() op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollModReadable: + case PollWritable: // client create a new connection and wait connect finished operator.inuse() + op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR + case PollModReadable: // client wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollDetach: + case PollDetach: // deregister op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollWritable: - operator.inuse() - op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollR2RW: + case PollR2RW: // connection wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollRW2R: + case PollRW2R: // connection wait read op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR } return EpollCtl(p.fd, op, operator.FD, &evt) } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) + if err := operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: poller detach operator failed: %v", err) + } operator.done() } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 7b6bdc23..4cb72eed 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -158,3 +158,166 @@ func TestEpollWait(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) MustNil(t, err) } + +func TestEpollETClose(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + rfd, wfd := GetSysFdPairs() + events := make([]epollevent, 128) + eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + event := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata, + } + + // EPOLL: init state + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + + // EPOLL: close current fd + // nothing will happen + err = syscall.Close(rfd) + MustNil(t, err) + n, err := EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 0, n) + err = syscall.Close(wfd) + MustNil(t, err) + + // EPOLL: close peer fd + // EPOLLOUT + rfd, wfd = GetSysFdPairs() + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = syscall.Close(wfd) + MustNil(t, err) + n, err = EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 1, n) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) +} + +func TestEpollETDel(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + rfd, wfd := GetSysFdPairs() + send := []byte("hello") + events := make([]epollevent, 128) + eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + event := &epollevent{ + events: EPOLLET | syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata, + } + + // EPOLL: del partly + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + MustNil(t, err) + event.events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) + MustNil(t, err) + _, err = syscall.Write(wfd, send) + MustNil(t, err) + _, err = EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN == 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) +} + +func TestEpollConnectSameFD(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + events := make([]epollevent, 128) + eventdata1 := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + eventdata2 := [8]byte{0, 0, 0, 0, 0, 0, 0, 2} + event1 := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata1, + } + event2 := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata2, + } + eventin := &epollevent{ + events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata1, + } + addr := syscall.SockaddrInet4{ + Port: 53, + Addr: [4]byte{8, 8, 8, 8}, + } + + // connect non-block socket + fd1, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd1) + err = syscall.SetNonblock(fd1, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1) + MustNil(t, err) + err = syscall.Connect(fd1, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + // forget to del fd + //err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) + //MustNil(t, err) + err = syscall.Close(fd1) // close fd1 + MustNil(t, err) + + // connect non-block socket with same fd + fd2, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd2) + err = syscall.SetNonblock(fd2, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2) + MustNil(t, err) + err = syscall.Connect(fd2, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd2, event2) + MustNil(t, err) + err = syscall.Close(fd2) // close fd2 + MustNil(t, err) + Equal(t, events[0].data, eventdata2) + + // no event after close fd + fd3, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd3) + err = syscall.SetNonblock(fd3, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1) + MustNil(t, err) + err = syscall.Connect(fd3, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, fd3, eventin) + MustNil(t, err) + err = syscall.Close(fd3) // close fd3 + MustNil(t, err) + n, err := EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 0) +} diff --git a/poll_manager.go b/poll_manager.go index 9f6f500d..2c2e8097 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -72,7 +72,7 @@ func (m *manager) SetNumLoops(numLoops int) error { polls[idx] = m.polls[idx] } else { if err := m.polls[idx].Close(); err != nil { - logger.Printf("poller close failed: %v\n", err) + logger.Printf("NETPOLL: poller close failed: %v\n", err) } } } diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 39540717..5caf393d 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -44,6 +44,7 @@ func openDefaultPoll() *defaultPoll { if err != nil { panic(err) } + l.opcache = newOperatorCache() return l } @@ -51,6 +52,7 @@ type defaultPoll struct { fd int trigger uint32 m sync.Map + opcache *operatorCache // operator cache hups []func(p Poll) error } @@ -102,7 +104,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -129,7 +131,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -140,6 +142,7 @@ func (p *defaultPoll) Wait() error { } // hup conns together to avoid blocking the poll. p.detaches() + p.opcache.free() } } @@ -195,9 +198,21 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return err } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) + if err := operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: poller detach operator failed: %v", err) + } operator.done() } diff --git a/poll_race_linux.go b/poll_race_linux.go index 8ede0d99..254e5c89 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -44,6 +44,7 @@ func openDefaultPoll() *defaultPoll { } poll.wfd = int(r0) poll.Control(&FDOperator{FD: poll.wfd}, PollReadable) + poll.opcache = newOperatorCache() return &poll } @@ -54,6 +55,7 @@ type defaultPoll struct { buf []byte // read wfd trigger msg trigger uint32 // trigger flag m sync.Map + opcache *operatorCache // operator cache } type pollArgs struct { @@ -96,6 +98,7 @@ func (p *defaultPoll) Wait() (err error) { if p.handler(p.events[:n]) { return nil } + p.opcache.free() } } @@ -130,18 +133,20 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { if operator.OnRead != nil { // for non-connection operator.OnRead(p) - } else { + } else if operator.Inputs != nil { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } @@ -166,7 +171,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) - } else { + } else if operator.Outputs != nil { // for connection var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { @@ -174,11 +179,13 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } operator.done() @@ -222,17 +229,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { operator.inuse() p.m.Store(operator.FD, operator) op, evt.Events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollModReadable: + case PollWritable: operator.inuse() + p.m.Store(operator.FD, operator) + op, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR + case PollModReadable: p.m.Store(operator.FD, operator) op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollDetach: p.m.Delete(operator.FD) op, evt.Events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollWritable: - operator.inuse() - p.m.Store(operator.FD, operator) - op, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollR2RW: op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollRW2R: @@ -241,6 +247,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return syscall.EpollCtl(p.fd, op, operator.FD, &evt) } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) operator.Control(PollDetach) diff --git a/poll_test.go b/poll_test.go index 6775b973..c30dab83 100644 --- a/poll_test.go +++ b/poll_test.go @@ -18,6 +18,7 @@ package netpoll import ( + "runtime" "sync" "sync/atomic" "syscall" @@ -83,22 +84,28 @@ func TestPollMod(t *testing.T) { err = p.Control(wop, PollWritable) // trigger one shot MustNil(t, err) - time.Sleep(50 * time.Millisecond) + for atomic.LoadInt32(&wn) == 0 { + runtime.Gosched() + } r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w == 1 && h == 0, r, w, h) err = p.Control(rop, PollR2RW) // trigger write MustNil(t, err) - time.Sleep(time.Millisecond) + for atomic.LoadInt32(&wn) <= 1 { + runtime.Gosched() + } r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w >= 2 && h == 0, r, w, h) // close wfd, then trigger hup rfd err = syscall.Close(wfd) // trigger hup MustNil(t, err) - time.Sleep(time.Millisecond) - r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) - Assert(t, r == 1 && w >= 2 && h >= 1, r, w, h) + for atomic.LoadInt32(&hn) == 0 { + runtime.Gosched() + } + w, h = atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) + Assert(t, w >= 2 && h >= 1, r, w, h) p.Close() err = <-stop