Skip to content

Commit

Permalink
optimize performance & update benchmarks (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allenxuxu authored Mar 15, 2021
1 parent e1348b1 commit 03ff49f
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 152 deletions.
44 changes: 0 additions & 44 deletions benchmarks/bench-echo.sh

This file was deleted.

38 changes: 18 additions & 20 deletions benchmarks/bench-pingpong.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ echo "--- BENCH PING PONG START ---"
echo ""

cd $(dirname "${BASH_SOURCE[0]}")
function cleanup {
echo "--- BENCH PING PONG DONE ---"
kill -9 $(jobs -rp)
wait $(jobs -rp) 2>/dev/null
function cleanup() {
echo "--- BENCH PING PONG DONE ---"
# kill -9 $(jobs -rp)
# wait $(jobs -rp) 2>/dev/null
}
trap cleanup EXIT

Expand All @@ -21,23 +21,21 @@ $(pkill -9 eviop-echo-server || printf "")
$(pkill -9 gev-echo-server || printf "")
$(pkill -9 gnet-echo-server || printf "")

function gobench {
echo "--- $1 ---"
if [ "$3" != "" ]; then
go build -o $2 $3
fi
GOMAXPROCS=4 $2 --port $4 --loops 4 &

sleep 1
echo "*** 1000 connections, 10 seconds, 4096 byte packets"
GOMAXPROCS=4 go run client/main.go -c 1000 -t 10 -m 4096 -a 127.0.0.1:$4
echo "--- DONE ---"
echo ""
function gobench() {
echo "--- $1 ---"
if [ "$3" != "" ]; then
go build -o $2 $3
fi
GOMAXPROCS=4 $2 --port $4 --loops 8 &

sleep 1
echo "*** 1000 connections, 60 seconds, 4096 byte packets"
GOMAXPROCS=4 go run client/main.go -c 1000 -t 60 -m 4096 -a 127.0.0.1:$4
echo "--- DONE ---"
echo ""
}

gobench "GEV" bin/gev-echo-server echo/echo.go 5000
gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5001
gobench "GO STDLIB" bin/net-echo-server net-echo-server/main.go 5004
gobench "GEV" bin/gev-echo-server gev-echo-server/echo.go 5000

exit 0
gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5010

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package main

import (
"flag"
"net/http"
"strconv"

_ "net/http/pprof"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)
Expand All @@ -20,6 +23,12 @@ func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []by
func (s *example) OnClose(c *connection.Connection) {}

func main() {
go func() {
if err := http.ListenAndServe(":6061", nil); err != nil {
panic(err)
}
}()

handler := new(example)
var port int
var loops int
Expand Down
63 changes: 37 additions & 26 deletions benchmarks/gnet-echo-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,46 @@ import (
"flag"
"fmt"
"log"
"strings"
"net/http"

_ "net/http/pprof"

"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/ringbuffer"
)

type echoServer struct {
*gnet.EventServer
}

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
// Echo synchronously.
out = frame
return

/*
// Echo asynchronously.
data := append([]byte{}, frame...)
go func() {
time.Sleep(time.Second)
c.AsyncWrite(data)
}()
return
*/
}

func main() {
go func() {
if err := http.ListenAndServe(":6062", nil); err != nil {
panic(err)
}
}()

var port int
var loops int
var udp bool
Expand All @@ -24,28 +57,6 @@ func main() {
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

var events gnet.Events
events.NumLoops = loops
events.OnInitComplete = func(srv gnet.Server) (action gnet.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
return
}
events.React = func(c gnet.Conn, inBuf *ringbuffer.RingBuffer) (out []byte, action gnet.Action) {
top, tail := inBuf.PreReadAll()
out = append(top, tail...)
inBuf.Reset()

if trace {
log.Printf("%s", strings.TrimSpace(string(top)+string(tail)))
}
return
}
scheme := "tcp"
if udp {
scheme = "udp"
}
log.Fatal(gnet.Serve(events, fmt.Sprintf("%s://:%d", scheme, port)))
echo := new(echoServer)
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithNumEventLoop(loops), gnet.WithReusePort(reuseport)))
}
35 changes: 0 additions & 35 deletions benchmarks/test.sh

This file was deleted.

43 changes: 30 additions & 13 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type CallBack interface {
type Connection struct {
fd int
connected atomic.Bool
buffer *ringbuffer.RingBuffer
outBuffer *ringbuffer.RingBuffer // write buffer
outBufferLen atomic.Int64
inBuffer *ringbuffer.RingBuffer // read buffer
outBufferLen atomic.Int64
inBufferLen atomic.Int64
callBack CallBack
loop *eventloop.EventLoop
Expand All @@ -45,7 +46,13 @@ type Connection struct {
var ErrConnectionClosed = errors.New("connection closed")

// New 创建 Connection
func New(fd int, loop *eventloop.EventLoop, sa unix.Sockaddr, protocol Protocol, tw *timingwheel.TimingWheel, idleTime time.Duration, callBack CallBack) *Connection {
func New(fd int,
loop *eventloop.EventLoop,
sa unix.Sockaddr,
protocol Protocol,
tw *timingwheel.TimingWheel,
idleTime time.Duration,
callBack CallBack) *Connection {
conn := &Connection{
fd: fd,
peerAddr: sockAddrToString(sa),
Expand All @@ -56,6 +63,7 @@ func New(fd int, loop *eventloop.EventLoop, sa unix.Sockaddr, protocol Protocol,
idleTime: idleTime,
timingWheel: tw,
protocol: protocol,
buffer: ringbuffer.New(0),
}
conn.connected.Set(true)

Expand All @@ -67,6 +75,10 @@ func New(fd int, loop *eventloop.EventLoop, sa unix.Sockaddr, protocol Protocol,
return conn
}

func (c *Connection) UserBuffer() *[]byte {
return c.loop.UserBuffer
}

func (c *Connection) closeTimeoutConn() func() {
return func() {
now := time.Now()
Expand Down Expand Up @@ -143,22 +155,27 @@ func (c *Connection) WriteBufferLength() int64 {

// HandleEvent 内部使用,event loop 回调
func (c *Connection) HandleEvent(fd int, events poller.Event) {
now := time.Now()
if c.idleTime > 0 {
_ = c.activeTime.Swap(now.Unix())
_ = c.activeTime.Swap(time.Now().Unix())
}

if events&poller.EventErr != 0 {
c.handleClose(fd)
return
}

if c.outBuffer.Length() != 0 {
if !c.outBuffer.IsEmpty() {
if events&poller.EventWrite != 0 {
c.handleWrite(fd)
if c.outBuffer.IsEmpty() {
c.outBuffer.Reset()
}
}
} else if events&poller.EventRead != 0 {
c.handleRead(fd)
if c.inBuffer.IsEmpty() {
c.inBuffer.Reset()
}
}

c.inBufferLen.Swap(int64(c.inBuffer.Length()))
Expand Down Expand Up @@ -188,13 +205,13 @@ func (c *Connection) handleRead(fd int) {
return
}

if c.inBuffer.Length() == 0 {
buffer := ringbuffer.NewWithData(buf[:n])
if c.inBuffer.IsEmpty() {
c.buffer.WithData(buf[:n])
buf = buf[n:n]
c.handlerProtocol(&buf, buffer)
c.handlerProtocol(&buf, c.buffer)

if buffer.Length() > 0 {
first, _ := buffer.PeekAll()
if !c.buffer.IsEmpty() {
first, _ := c.buffer.PeekAll()
_, _ = c.inBuffer.Write(first)
}
} else {
Expand Down Expand Up @@ -232,7 +249,7 @@ func (c *Connection) handleWrite(fd int) {
c.outBuffer.Retrieve(n)
}

if c.outBuffer.Length() == 0 {
if c.outBuffer.IsEmpty() {
if err := c.loop.EnableRead(fd); err != nil {
log.Error("[EnableRead]", err)
}
Expand All @@ -255,7 +272,7 @@ func (c *Connection) handleClose(fd int) {
}

func (c *Connection) sendInLoop(data []byte) {
if c.outBuffer.Length() > 0 {
if !c.outBuffer.IsEmpty() {
_, _ = c.outBuffer.Write(data)
} else {
n, err := unix.Write(c.fd, data)
Expand All @@ -270,7 +287,7 @@ func (c *Connection) sendInLoop(data []byte) {
_, _ = c.outBuffer.Write(data[n:])
}

if c.outBuffer.Length() > 0 {
if !c.outBuffer.IsEmpty() {
_ = c.loop.EnableReadWrite(c.fd)
}
}
Expand Down
21 changes: 18 additions & 3 deletions connection/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,24 @@ type DefaultProtocol struct{}

// UnPacket 拆包
func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
ret := buffer.Bytes()
buffer.RetrieveAll()
return nil, ret
s, e := buffer.PeekAll()
if len(e) > 0 {
size := len(s) + len(e)
userBuffer := *c.UserBuffer()
if size > cap(userBuffer) {
userBuffer = make([]byte, size)
*c.UserBuffer() = userBuffer
}

copy(userBuffer, s)
copy(userBuffer[len(s):], e)

return nil, userBuffer
} else {
buffer.RetrieveAll()

return nil, s
}
}

// Packet 封包
Expand Down
Loading

0 comments on commit 03ff49f

Please sign in to comment.