diff --git a/README.md b/README.md index 151e34c4..5a8662d5 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,8 @@ excellent [Performance](#performance), and is more suitable for microservice arc Also [Netpoll][Netpoll] provides a number of [Features](#features), and it is recommended to replace [net][net] in some RPC scenarios. -We developed the RPC framework [Kitex][Kitex] and HTTP -framework [Hertz][Hertz] (coming soon) based -on [Netpoll][Netpoll], both with industry-leading performance. +We developed the RPC framework [Kitex][Kitex] and HTTP framework [Hertz][Hertz] +based on [Netpoll][Netpoll], both with industry-leading performance. [Examples][netpoll-examples] show how to build RPC client and server using [Netpoll][Netpoll]. @@ -73,7 +72,7 @@ In the RPC scenario, concurrency and timeout are necessary support items. We provide the [netpoll-benchmark][netpoll-benchmark] project to track and compare the performance of [Netpoll][Netpoll] and other frameworks under different conditions for reference. -More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmark][hertz-benchmark] (open source soon). +More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmark][hertz-benchmark]. ## Reference diff --git a/README_CN.md b/README_CN.md index a209f3ed..8e8d0bac 100644 --- a/README_CN.md +++ b/README_CN.md @@ -29,8 +29,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提 同时,[Netpoll][Netpoll] 还提供了一些 [特性](#特性),推荐在 RPC 设计中替代 [net][net] 。 -基于 [Netpoll][Netpoll] 开发的 RPC 框架 [Kitex][Kitex] 和 HTTP -框架 [Hertz][Hertz] (即将开源),性能均业界领先。 +基于 [Netpoll][Netpoll] 开发的 RPC 框架 [Kitex][Kitex] 和 HTTP 框架 [Hertz][Hertz],性能均业界领先。 [范例][netpoll-examples] 展示了如何使用 [Netpoll][Netpoll] 构建 RPC Client 和 Server。 @@ -66,7 +65,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提 我们提供了 [netpoll-benchmark][netpoll-benchmark] 项目用来长期追踪和比较 [Netpoll][Netpoll] 与其他框架在不同情况下的性能数据以供参考。 -更多测试参考 [kitex-benchmark][kitex-benchmark] 和 [hertz-benchmark][hertz-benchmark] (即将开源) +更多测试参考 [kitex-benchmark][kitex-benchmark] 和 [hertz-benchmark][hertz-benchmark] ## 参考 diff --git a/connection_onevent.go b/connection_onevent.go index a4bcc286..6d875b41 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -54,32 +54,37 @@ type callbackNode struct { } // SetOnConnect set the OnConnect callback. -func (on *onEvent) SetOnConnect(onConnect OnConnect) error { +func (c *connection) SetOnConnect(onConnect OnConnect) error { if onConnect != nil { - on.onConnectCallback.Store(onConnect) + c.onConnectCallback.Store(onConnect) } return nil } // SetOnRequest initialize ctx when setting OnRequest. -func (on *onEvent) SetOnRequest(onRequest OnRequest) error { - if onRequest != nil { - on.onRequestCallback.Store(onRequest) +func (c *connection) SetOnRequest(onRequest OnRequest) error { + if onRequest == nil { + return nil + } + c.onRequestCallback.Store(onRequest) + // fix: trigger OnRequest if there is already input data. + if !c.inputBuffer.IsEmpty() { + c.onRequest() } return nil } // AddCloseCallback adds a CloseCallback to this connection. -func (on *onEvent) AddCloseCallback(callback CloseCallback) error { +func (c *connection) AddCloseCallback(callback CloseCallback) error { if callback == nil { return nil } var cb = &callbackNode{} cb.fn = callback - if pre := on.closeCallbacks.Load(); pre != nil { + if pre := c.closeCallbacks.Load(); pre != nil { cb.pre = pre.(*callbackNode) } - on.closeCallbacks.Store(cb) + c.closeCallbacks.Store(cb) return nil } @@ -203,6 +208,10 @@ func (c *connection) closeCallback(needLock bool) (err error) { if needLock && !c.lock(processing) { return nil } + // If Close is called during OnPrepare, poll is not registered. + if c.closeBy(user) && c.operator.poll != nil { + c.operator.Control(PollDetach) + } var latest = c.closeCallbacks.Load() if latest == nil { return nil diff --git a/connection_reactor.go b/connection_reactor.go index 6b621f72..c175fca0 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -41,10 +41,6 @@ func (c *connection) onHup(p Poll) error { // onClose means close by user. func (c *connection) onClose() error { if c.closeBy(user) { - // If Close is called during OnPrepare, poll is not registered. - if c.operator.poll != nil { - c.operator.Control(PollDetach) - } c.triggerRead() c.triggerWrite(ErrConnClosed) c.closeCallback(true) @@ -157,7 +153,9 @@ func (c *connection) flush() error { if err != nil { return Exception(err, "when flush") } - err = <-c.writeTrigger - return err + if err != nil { + return Exception(err, "when flush") + } + return nil } diff --git a/net_polldesc.go b/net_polldesc.go index 740ef444..b199372a 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -59,7 +59,7 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error { pd.operator.poll = pollmanager.Pick() err = pd.operator.Control(PollWritable) if err != nil { - pd.operator.Control(PollDetach) + pd.detach() } }) if err != nil { @@ -68,7 +68,7 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error { select { case <-ctx.Done(): - pd.operator.Control(PollDetach) + pd.detach() return mapErr(ctx.Err()) case <-pd.closeTrigger: return Exception(ErrConnClosed, "by peer") @@ -82,3 +82,8 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error { } } } + +func (pd *pollDesc) detach() { + pd.operator.Control(PollDetach) + pd.operator.unused() +} diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 9af49fb6..c581901b 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -327,6 +327,27 @@ func TestLinkBufferRefer(t *testing.T) { Equal(t, buf.read.Len(), block8k-9) } +func TestLinkBufferResetTail(t *testing.T) { + except := byte(1) + + LinkBufferCap = 8 + buf := NewLinkBuffer() + + // 1. slice reader + buf.WriteByte(except) + buf.Flush() + r1, _ := buf.Slice(1) + fmt.Printf("1: %x\n", buf.flush.buf) + // 2. release & reset tail + buf.resetTail(LinkBufferCap) + buf.WriteByte(byte(2)) + fmt.Printf("2: %x\n", buf.flush.buf) + + // check slice reader + got, _ := r1.ReadByte() + Equal(t, got, except) +} + func TestWriteBuffer(t *testing.T) { buf1 := NewLinkBuffer() buf2 := NewLinkBuffer() diff --git a/poll_default_linux.go b/poll_default_linux.go index 2fbffbfb..49a170ff 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -109,6 +109,9 @@ func (p *defaultPoll) Wait() (err error) { func (p *defaultPoll) handler(events []epollevent) (closed bool) { for i := range events { var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data)) + if !operator.do() { + continue + } // trigger or exit gracefully if operator.FD == p.wop.FD { // must clean trigger first @@ -118,11 +121,10 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if p.buf[0] > 0 { syscall.Close(p.wop.FD) syscall.Close(p.fd) + operator.done() return true } - continue - } - if !operator.do() { + operator.done() continue } diff --git a/poll_manager.go b/poll_manager.go index 3fded62e..9f23c777 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -49,7 +49,7 @@ type manager struct { // SetNumLoops will return error when set numLoops < 1 func (m *manager) SetNumLoops(numLoops int) error { if numLoops < 1 { - return fmt.Errorf("set invaild numLoops[%d]", numLoops) + return fmt.Errorf("set invalid numLoops[%d]", numLoops) } if numLoops < m.NumLoops { diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 3b8171e8..39b2d7e6 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -82,12 +82,11 @@ func (p *defaultPoll) Wait() error { atomic.StoreUint32(&p.trigger, 0) continue } - var operator *FDOperator - if tmp, ok := p.m.Load(fd); ok { - operator = tmp.(*FDOperator) - } else { + tmp, ok := p.m.Load(fd) + if !ok { continue } + operator := tmp.(*FDOperator) if !operator.do() { continue } diff --git a/poll_race_linux.go b/poll_race_linux.go index 948d0133..85fdff6c 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -102,14 +102,9 @@ func (p *defaultPoll) Wait() (err error) { func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { for i := range events { - var operator *FDOperator - if tmp, ok := p.m.Load(int(events[i].Fd)); ok { - operator = tmp.(*FDOperator) - } else { - continue - } + var fd = int(events[i].Fd) // trigger or exit gracefully - if operator.FD == p.wfd { + if fd == p.wfd { // must clean trigger first syscall.Read(p.wfd, p.buf) atomic.StoreUint32(&p.trigger, 0) @@ -121,6 +116,11 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { } continue } + tmp, ok := p.m.Load(fd) + if !ok { + continue + } + operator := tmp.(*FDOperator) if !operator.do() { continue } diff --git a/sys_exec.go b/sys_exec.go index 33122adf..511a26fb 100644 --- a/sys_exec.go +++ b/sys_exec.go @@ -68,6 +68,7 @@ func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { } // syscall r, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen)) + resetIovecs(bs, ivs[:iovLen]) if e != 0 { return int(r), syscall.Errno(e) } @@ -83,6 +84,7 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { } // syscall r, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen)) + resetIovecs(bs, ivs[:iovLen]) if e != 0 { return int(r), syscall.Errno(e) } @@ -98,15 +100,22 @@ func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) { if len(chunk) == 0 { continue } - iov := &syscall.Iovec{Base: &chunk[0]} - iov.SetLen(len(chunk)) - // append - ivs[iovLen] = *iov + ivs[iovLen].Base = &chunk[0] + ivs[iovLen].SetLen(len(chunk)) iovLen++ } return iovLen } +func resetIovecs(bs [][]byte, ivs []syscall.Iovec) { + for i := 0; i < len(bs); i++ { + bs[i] = nil + } + for i := 0; i < len(ivs); i++ { + ivs[i].Base = nil + } +} + // Boolean to int. func boolint(b bool) int { if b { diff --git a/sys_sendmsg_bsd.go b/sys_sendmsg_bsd.go index 263efb07..d605ae4d 100644 --- a/sys_sendmsg_bsd.go +++ b/sys_sendmsg_bsd.go @@ -36,6 +36,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er } // flags = syscall.MSG_DONTWAIT r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), uintptr(0)) + resetIovecs(bs, ivs[:iovLen]) if e != 0 { return int(r), syscall.Errno(e) } diff --git a/sys_sendmsg_linux.go b/sys_sendmsg_linux.go index 86d7af92..5a849db2 100644 --- a/sys_sendmsg_linux.go +++ b/sys_sendmsg_linux.go @@ -45,6 +45,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er flags = MSG_ZEROCOPY } r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), flags) + resetIovecs(bs, ivs[:iovLen]) if e != 0 { return int(r), syscall.Errno(e) }