Skip to content

Commit

Permalink
optimize performance (#71)
Browse files Browse the repository at this point in the history
* fix unit test
* Improve eventloop performance
* update benchmarks
  • Loading branch information
Allenxuxu authored Mar 18, 2021
1 parent 03ff49f commit 22a5f43
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 51 deletions.
16 changes: 10 additions & 6 deletions benchmarks/bench-pingpong.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ echo ""
cd $(dirname "${BASH_SOURCE[0]}")
function cleanup() {
echo "--- BENCH PING PONG DONE ---"
# kill -9 $(jobs -rp)
# wait $(jobs -rp) 2>/dev/null
kill -9 $(jobs -rp)
wait $(jobs -rp) 2>/dev/null
}
trap cleanup EXIT

Expand All @@ -26,16 +26,20 @@ function gobench() {
if [ "$3" != "" ]; then
go build -o $2 $3
fi
GOMAXPROCS=4 $2 --port $4 --loops 8 &
$2 --port $4 --loops 8 &

sleep 1
echo "*** 1000 connections, 60 seconds, 4096 byte packets"
GOMAXPROCS=4 go run client/main.go -c 1000 -t 60 -m 4096 -a 127.0.0.1:$4
go run client/main.go -c 3000 -t 10 -m 4096 -a 127.0.0.1:$4

pkill -9 $2 || printf ""
echo "--- DONE ---"
echo ""
}

gobench "GEV" bin/gev-echo-server gev-echo-server/echo.go 5000

gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5010
gobench "NET" bin/net-echo-server net-echo-server/main.go 5001

gobench "EVIO" bin/evio-echo-server evio-echo-server/main.go 5002

gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5010
34 changes: 34 additions & 0 deletions benchmarks/bench-websocket-pingpong.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

set -e

echo ""
echo "--- BENCH websocket PING PONG START ---"
echo ""

cd $(dirname "${BASH_SOURCE[0]}")
function cleanup() {
echo "--- BENCH websocket PING PONG DONE ---"
kill -9 $(jobs -rp)
wait $(jobs -rp) 2>/dev/null
}
trap cleanup EXIT

mkdir -p bin
$(pkill -9 websocket-server || printf "")

function gobench() {
echo "--- $1 ---"
if [ "$3" != "" ]; then
go build -o $2 $3
fi
$2 --port $4 --loops 8 &

sleep 1
go run websocket/client/main.go -c 3000 -t 5 -m 2048 -a 127.0.0.1:$4
pkill -9 $2 || printf ""
echo "--- DONE ---"
echo ""
}

gobench "Gev-websocket" bin/websocket-server websocket/server.go 6000
16 changes: 12 additions & 4 deletions benchmarks/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@ var msg []byte

func main() {
flag.Parse()

fmt.Printf("*** %d connections, %d seconds, %d byte packets ***\n", *num, *timeOut, *msgLen)

msg = make([]byte, *msgLen)
rand.Read(msg)

startC := make(chan interface{})
closeC := make(chan interface{})
result := make(chan int64, *num)
req := make(chan int64, *num)

for i := 0; i < *num; i++ {
conn, err := net.Dial("tcp", *addr)
if err != nil {
panic(err)
}
go handler(conn, startC, closeC, result)
go handler(conn, startC, closeC, result, req)
}

// start
Expand All @@ -39,16 +43,18 @@ func main() {
// stop
close(closeC)

var totalMessagesRead int64
var totalMessagesRead, reqCount int64
for i := 0; i < *num; i++ {
totalMessagesRead += <-result
reqCount += <-req
}

fmt.Println(totalMessagesRead/int64(*timeOut*1024*1024), " MiB/s throughput")
fmt.Println(reqCount/int64(*timeOut), " qps")
}

func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, result chan int64) {
var count int64
func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, result, req chan int64) {
var count, reqCount int64
buf := make([]byte, 2*(*msgLen))
<-startC

Expand All @@ -61,12 +67,14 @@ func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, re
select {
case <-closeC:
result <- count
req <- reqCount
conn.Close()
return
default:
n, err := conn.Read(buf)
if n > 0 {
count += int64(n)
reqCount++
}
if err != nil {
fmt.Print("Error to read message because of ", err)
Expand Down
1 change: 0 additions & 1 deletion benchmarks/evio-echo-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func main() {
var events evio.Events
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
Expand Down
1 change: 0 additions & 1 deletion benchmarks/eviop-echo-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func main() {
var events eviop.Events
events.NumLoops = loops
events.Serving = func(srv eviop.Server) (action eviop.Action) {
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
Expand Down
17 changes: 15 additions & 2 deletions benchmarks/gev-echo-server/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net/http"
"strconv"

"github.com/Allenxuxu/gev/log"

_ "net/http/pprof"

"github.com/Allenxuxu/gev"
Expand All @@ -16,15 +18,26 @@ type example struct {

func (s *example) OnConnect(c *connection.Connection) {}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {

out = data

//msg := append([]byte{}, data...)
//go func() {
// if err := c.Send(msg); err != nil {
// //log.Errorf("send error :%v", err)
// }
//}()
return
}

func (s *example) OnClose(c *connection.Connection) {}
func (s *example) OnClose(c *connection.Connection) {
//log.Error("onclose ")
}

func main() {
log.SetLevel(log.LevelDebug)
go func() {
if err := http.ListenAndServe(":6061", nil); err != nil {
if err := http.ListenAndServe(":6089", nil); err != nil {
panic(err)
}
}()
Expand Down
20 changes: 7 additions & 13 deletions benchmarks/gnet-echo-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"net/http"

_ "net/http/pprof"

"github.com/panjf2000/gnet"
Expand All @@ -16,25 +15,20 @@ type echoServer struct {
}

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
// Echo synchronously.
out = frame
return

/*
// Echo asynchronously.
data := append([]byte{}, frame...)
go func() {
time.Sleep(time.Second)
c.AsyncWrite(data)
}()
return
*/
//// Echo asynchronously.
//data := append([]byte{}, frame...)
//go func() {
// //time.Sleep(time.Second)
// c.AsyncWrite(data)
//}()
return
}

func main() {
Expand Down
96 changes: 96 additions & 0 deletions benchmarks/websocket/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"bytes"
"errors"
"flag"
"fmt"
"math/rand"
"time"

"golang.org/x/net/websocket"
)

var (
addr = flag.String("a", "localhost:1833", "address")
num = flag.Int("c", 1, "connection number")
timeOut = flag.Int("t", 2, "timeout second")
msgLen = flag.Int("m", 1024, "message length")

msg []byte
)

func main() {
flag.Parse()
fmt.Printf("*** %d connections, %d seconds, %d byte packets ***\n", *num, *timeOut, *msgLen)

msg = make([]byte, *msgLen)
rand.Read(msg)
startC := make(chan interface{})
closeC := make(chan interface{})
result := make(chan int64, *num)
req := make(chan int64, *num)

for i := 0; i < *num; i++ {
go startWebSocketClient(startC, closeC, result, req)
}

// start
close(startC)
time.Sleep(time.Duration(*timeOut) * time.Second)
// stop
close(closeC)

var totalMessagesRead, reqCount int64
for i := 0; i < *num; i++ {
totalMessagesRead += <-result
reqCount += <-req
}

fmt.Println(totalMessagesRead/int64(*timeOut*1024*1024), " MiB/s throughput")
fmt.Println(reqCount/int64(*timeOut), " qps")
}

func startWebSocketClient(startC chan interface{}, closeC chan interface{}, result, req chan int64) {
var count, reqCount int64
buf := make([]byte, 2*(*msgLen))

address := "ws://" + *addr
c, err := websocket.Dial(address, "", address)
if err != nil {
panic(err)
}
c.MaxPayloadBytes = *msgLen * 2
<-startC

if n, err := c.Write(msg); err != nil || n != len(msg) {
panic(err)
}

for {
select {
case <-closeC:
result <- count
req <- reqCount
c.Close()
return
default:
n, err := c.Read(buf)
if err != nil || n != len(msg) {
fmt.Printf("read error %v %d", err, n)
panic(errors.New("read error"))
}
if !bytes.Equal(msg, buf[:n]) {
panic("mismatch")
}

count += int64(n)
reqCount++

_, err = c.Write(msg)
if err != nil {
fmt.Println("Error to send message because of ", err.Error())
}
}
}
}
72 changes: 72 additions & 0 deletions benchmarks/websocket/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"flag"
"strconv"

"github.com/Allenxuxu/gev/log"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
)

type example struct {
}

// connection lifecycle
// OnConnect() -> OnRequest() -> OnHeader() -> OnMessage() -> OnClose()

func (s *example) OnConnect(c *connection.Connection) {
//log.Println("OnConnect: ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, data []byte) (messageType ws.MessageType, out []byte) {
//log.Println("OnMessage: ", string(data))

messageType = ws.MessageBinary
out = data

return
}

func (s *example) OnClose(c *connection.Connection) {
//log.Println("123 OnClose", c.PeerAddr())
}

// NewWebSocketServer 创建 WebSocket Server
func NewWebSocketServer(handler websocket.WSHandler, u *ws.Upgrader, opts ...gev.Option) (server *gev.Server, err error) {
opts = append(opts, gev.Protocol(websocket.New(u)))
return gev.NewServer(websocket.NewHandlerWrap(u, handler), opts...)
}

func main() {
log.SetLevel(log.LevelDebug)
var (
port int
loops int
)

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

handler := &example{}
wsUpgrader := &ws.Upgrader{}
//wsUpgrader.OnRequest = func(c *connection.Connection, uri []byte) error {
// log.Println("OnRequest: ", string(uri))
//
// return nil
//}

s, err := NewWebSocketServer(handler, wsUpgrader,
gev.Network("tcp"),
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops))
if err != nil {
panic(err)
}

s.Start()
}
Loading

0 comments on commit 22a5f43

Please sign in to comment.