Skip to content

Commit

Permalink
Merge pull request #171 from cloudwego/release/v0.2.5
Browse files Browse the repository at this point in the history
chore: release v0.2.5
  • Loading branch information
joway authored Jul 13, 2022
2 parents 4a19aee + 601dfa0 commit 93eca42
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 42 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ excellent [Performance](#performance), and is more suitable for microservice arc
Also [Netpoll][Netpoll] provides a number of [Features](#features), and it is recommended
to replace [net][net] in some RPC scenarios.

We developed the RPC framework [Kitex][Kitex] and HTTP
framework [Hertz][Hertz] (coming soon) based
on [Netpoll][Netpoll], both with industry-leading performance.
We developed the RPC framework [Kitex][Kitex] and HTTP framework [Hertz][Hertz]
based on [Netpoll][Netpoll], both with industry-leading performance.

[Examples][netpoll-examples] show how to build RPC client and server
using [Netpoll][Netpoll].
Expand Down Expand Up @@ -73,7 +72,7 @@ In the RPC scenario, concurrency and timeout are necessary support items.
We provide the [netpoll-benchmark][netpoll-benchmark] project to track and compare
the performance of [Netpoll][Netpoll] and other frameworks under different conditions for reference.

More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmark][hertz-benchmark] (open source soon).
More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmark][hertz-benchmark].

## Reference

Expand Down
5 changes: 2 additions & 3 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
同时,[Netpoll][Netpoll] 还提供了一些 [特性](#特性),推荐在 RPC 设计中替代
[net][net]

基于 [Netpoll][Netpoll] 开发的 RPC 框架 [Kitex][Kitex] 和 HTTP
框架 [Hertz][Hertz] (即将开源),性能均业界领先。
基于 [Netpoll][Netpoll] 开发的 RPC 框架 [Kitex][Kitex] 和 HTTP 框架 [Hertz][Hertz],性能均业界领先。

[范例][netpoll-examples] 展示了如何使用 [Netpoll][Netpoll]
构建 RPC Client 和 Server。
Expand Down Expand Up @@ -66,7 +65,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提

我们提供了 [netpoll-benchmark][netpoll-benchmark] 项目用来长期追踪和比较 [Netpoll][Netpoll] 与其他框架在不同情况下的性能数据以供参考。

更多测试参考 [kitex-benchmark][kitex-benchmark][hertz-benchmark][hertz-benchmark] (即将开源)
更多测试参考 [kitex-benchmark][kitex-benchmark][hertz-benchmark][hertz-benchmark]

## 参考

Expand Down
25 changes: 17 additions & 8 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,37 @@ type callbackNode struct {
}

// SetOnConnect set the OnConnect callback.
func (on *onEvent) SetOnConnect(onConnect OnConnect) error {
func (c *connection) SetOnConnect(onConnect OnConnect) error {
if onConnect != nil {
on.onConnectCallback.Store(onConnect)
c.onConnectCallback.Store(onConnect)
}
return nil
}

// SetOnRequest initialize ctx when setting OnRequest.
func (on *onEvent) SetOnRequest(onRequest OnRequest) error {
if onRequest != nil {
on.onRequestCallback.Store(onRequest)
func (c *connection) SetOnRequest(onRequest OnRequest) error {
if onRequest == nil {
return nil
}
c.onRequestCallback.Store(onRequest)
// fix: trigger OnRequest if there is already input data.
if !c.inputBuffer.IsEmpty() {
c.onRequest()
}
return nil
}

// AddCloseCallback adds a CloseCallback to this connection.
func (on *onEvent) AddCloseCallback(callback CloseCallback) error {
func (c *connection) AddCloseCallback(callback CloseCallback) error {
if callback == nil {
return nil
}
var cb = &callbackNode{}
cb.fn = callback
if pre := on.closeCallbacks.Load(); pre != nil {
if pre := c.closeCallbacks.Load(); pre != nil {
cb.pre = pre.(*callbackNode)
}
on.closeCallbacks.Store(cb)
c.closeCallbacks.Store(cb)
return nil
}

Expand Down Expand Up @@ -203,6 +208,10 @@ func (c *connection) closeCallback(needLock bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.closeBy(user) && c.operator.poll != nil {
c.operator.Control(PollDetach)
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
Expand Down
10 changes: 4 additions & 6 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func (c *connection) onHup(p Poll) error {
// onClose means close by user.
func (c *connection) onClose() error {
if c.closeBy(user) {
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
c.operator.Control(PollDetach)
}
c.triggerRead()
c.triggerWrite(ErrConnClosed)
c.closeCallback(true)
Expand Down Expand Up @@ -157,7 +153,9 @@ func (c *connection) flush() error {
if err != nil {
return Exception(err, "when flush")
}

err = <-c.writeTrigger
return err
if err != nil {
return Exception(err, "when flush")
}
return nil
}
9 changes: 7 additions & 2 deletions net_polldesc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error {
pd.operator.poll = pollmanager.Pick()
err = pd.operator.Control(PollWritable)
if err != nil {
pd.operator.Control(PollDetach)
pd.detach()
}
})
if err != nil {
Expand All @@ -68,7 +68,7 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error {

select {
case <-ctx.Done():
pd.operator.Control(PollDetach)
pd.detach()
return mapErr(ctx.Err())
case <-pd.closeTrigger:
return Exception(ErrConnClosed, "by peer")
Expand All @@ -82,3 +82,8 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) error {
}
}
}

func (pd *pollDesc) detach() {
pd.operator.Control(PollDetach)
pd.operator.unused()
}
21 changes: 21 additions & 0 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,27 @@ func TestLinkBufferRefer(t *testing.T) {
Equal(t, buf.read.Len(), block8k-9)
}

func TestLinkBufferResetTail(t *testing.T) {
except := byte(1)

LinkBufferCap = 8
buf := NewLinkBuffer()

// 1. slice reader
buf.WriteByte(except)
buf.Flush()
r1, _ := buf.Slice(1)
fmt.Printf("1: %x\n", buf.flush.buf)
// 2. release & reset tail
buf.resetTail(LinkBufferCap)
buf.WriteByte(byte(2))
fmt.Printf("2: %x\n", buf.flush.buf)

// check slice reader
got, _ := r1.ReadByte()
Equal(t, got, except)
}

func TestWriteBuffer(t *testing.T) {
buf1 := NewLinkBuffer()
buf2 := NewLinkBuffer()
Expand Down
8 changes: 5 additions & 3 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (p *defaultPoll) Wait() (err error) {
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
for i := range events {
var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data))
if !operator.do() {
continue
}
// trigger or exit gracefully
if operator.FD == p.wop.FD {
// must clean trigger first
Expand All @@ -118,11 +121,10 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) {
if p.buf[0] > 0 {
syscall.Close(p.wop.FD)
syscall.Close(p.fd)
operator.done()
return true
}
continue
}
if !operator.do() {
operator.done()
continue
}

Expand Down
2 changes: 1 addition & 1 deletion poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type manager struct {
// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
if numLoops < 1 {
return fmt.Errorf("set invaild numLoops[%d]", numLoops)
return fmt.Errorf("set invalid numLoops[%d]", numLoops)
}

if numLoops < m.NumLoops {
Expand Down
7 changes: 3 additions & 4 deletions poll_race_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ func (p *defaultPoll) Wait() error {
atomic.StoreUint32(&p.trigger, 0)
continue
}
var operator *FDOperator
if tmp, ok := p.m.Load(fd); ok {
operator = tmp.(*FDOperator)
} else {
tmp, ok := p.m.Load(fd)
if !ok {
continue
}
operator := tmp.(*FDOperator)
if !operator.do() {
continue
}
Expand Down
14 changes: 7 additions & 7 deletions poll_race_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,9 @@ func (p *defaultPoll) Wait() (err error) {

func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) {
for i := range events {
var operator *FDOperator
if tmp, ok := p.m.Load(int(events[i].Fd)); ok {
operator = tmp.(*FDOperator)
} else {
continue
}
var fd = int(events[i].Fd)
// trigger or exit gracefully
if operator.FD == p.wfd {
if fd == p.wfd {
// must clean trigger first
syscall.Read(p.wfd, p.buf)
atomic.StoreUint32(&p.trigger, 0)
Expand All @@ -121,6 +116,11 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) {
}
continue
}
tmp, ok := p.m.Load(fd)
if !ok {
continue
}
operator := tmp.(*FDOperator)
if !operator.do() {
continue
}
Expand Down
17 changes: 13 additions & 4 deletions sys_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
}
// syscall
r, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
Expand All @@ -83,6 +84,7 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
}
// syscall
r, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
Expand All @@ -98,15 +100,22 @@ func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) {
if len(chunk) == 0 {
continue
}
iov := &syscall.Iovec{Base: &chunk[0]}
iov.SetLen(len(chunk))
// append
ivs[iovLen] = *iov
ivs[iovLen].Base = &chunk[0]
ivs[iovLen].SetLen(len(chunk))
iovLen++
}
return iovLen
}

func resetIovecs(bs [][]byte, ivs []syscall.Iovec) {
for i := 0; i < len(bs); i++ {
bs[i] = nil
}
for i := 0; i < len(ivs); i++ {
ivs[i].Base = nil
}
}

// Boolean to int.
func boolint(b bool) int {
if b {
Expand Down
1 change: 1 addition & 0 deletions sys_sendmsg_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er
}
// flags = syscall.MSG_DONTWAIT
r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), uintptr(0))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
Expand Down
1 change: 1 addition & 0 deletions sys_sendmsg_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er
flags = MSG_ZEROCOPY
}
r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), flags)
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
Expand Down

0 comments on commit 93eca42

Please sign in to comment.