diff --git a/benchmarks/bench-pingpong.sh b/benchmarks/bench-pingpong.sh index d310734..003188b 100755 --- a/benchmarks/bench-pingpong.sh +++ b/benchmarks/bench-pingpong.sh @@ -9,8 +9,8 @@ echo "" cd $(dirname "${BASH_SOURCE[0]}") function cleanup() { echo "--- BENCH PING PONG DONE ---" - # kill -9 $(jobs -rp) - # wait $(jobs -rp) 2>/dev/null + kill -9 $(jobs -rp) + wait $(jobs -rp) 2>/dev/null } trap cleanup EXIT @@ -26,16 +26,20 @@ function gobench() { if [ "$3" != "" ]; then go build -o $2 $3 fi - GOMAXPROCS=4 $2 --port $4 --loops 8 & + $2 --port $4 --loops 8 & sleep 1 - echo "*** 1000 connections, 60 seconds, 4096 byte packets" - GOMAXPROCS=4 go run client/main.go -c 1000 -t 60 -m 4096 -a 127.0.0.1:$4 + go run client/main.go -c 3000 -t 10 -m 4096 -a 127.0.0.1:$4 + + pkill -9 $2 || printf "" echo "--- DONE ---" echo "" } gobench "GEV" bin/gev-echo-server gev-echo-server/echo.go 5000 -gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5010 +gobench "NET" bin/net-echo-server net-echo-server/main.go 5001 +gobench "EVIO" bin/evio-echo-server evio-echo-server/main.go 5002 + +gobench "GNET" bin/gnet-echo-server gnet-echo-server/main.go 5010 diff --git a/benchmarks/bench-websocket-pingpong.sh b/benchmarks/bench-websocket-pingpong.sh new file mode 100755 index 0000000..28c2ea0 --- /dev/null +++ b/benchmarks/bench-websocket-pingpong.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +set -e + +echo "" +echo "--- BENCH websocket PING PONG START ---" +echo "" + +cd $(dirname "${BASH_SOURCE[0]}") +function cleanup() { + echo "--- BENCH websocket PING PONG DONE ---" + kill -9 $(jobs -rp) + wait $(jobs -rp) 2>/dev/null +} +trap cleanup EXIT + +mkdir -p bin +$(pkill -9 websocket-server || printf "") + +function gobench() { + echo "--- $1 ---" + if [ "$3" != "" ]; then + go build -o $2 $3 + fi + $2 --port $4 --loops 8 & + + sleep 1 + go run websocket/client/main.go -c 3000 -t 5 -m 2048 -a 127.0.0.1:$4 + pkill -9 $2 || printf "" + echo "--- DONE ---" + echo "" +} + +gobench "Gev-websocket" bin/websocket-server websocket/server.go 6000 diff --git a/benchmarks/client/main.go b/benchmarks/client/main.go index e3fd64d..df775a5 100644 --- a/benchmarks/client/main.go +++ b/benchmarks/client/main.go @@ -17,19 +17,23 @@ var msg []byte func main() { flag.Parse() + + fmt.Printf("*** %d connections, %d seconds, %d byte packets ***\n", *num, *timeOut, *msgLen) + msg = make([]byte, *msgLen) rand.Read(msg) startC := make(chan interface{}) closeC := make(chan interface{}) result := make(chan int64, *num) + req := make(chan int64, *num) for i := 0; i < *num; i++ { conn, err := net.Dial("tcp", *addr) if err != nil { panic(err) } - go handler(conn, startC, closeC, result) + go handler(conn, startC, closeC, result, req) } // start @@ -39,16 +43,18 @@ func main() { // stop close(closeC) - var totalMessagesRead int64 + var totalMessagesRead, reqCount int64 for i := 0; i < *num; i++ { totalMessagesRead += <-result + reqCount += <-req } fmt.Println(totalMessagesRead/int64(*timeOut*1024*1024), " MiB/s throughput") + fmt.Println(reqCount/int64(*timeOut), " qps") } -func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, result chan int64) { - var count int64 +func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, result, req chan int64) { + var count, reqCount int64 buf := make([]byte, 2*(*msgLen)) <-startC @@ -61,12 +67,14 @@ func handler(conn net.Conn, startC chan interface{}, closeC chan interface{}, re select { case <-closeC: result <- count + req <- reqCount conn.Close() return default: n, err := conn.Read(buf) if n > 0 { count += int64(n) + reqCount++ } if err != nil { fmt.Print("Error to read message because of ", err) diff --git a/benchmarks/evio-echo-server/main.go b/benchmarks/evio-echo-server/main.go index 34ff332..046d306 100644 --- a/benchmarks/evio-echo-server/main.go +++ b/benchmarks/evio-echo-server/main.go @@ -28,7 +28,6 @@ func main() { var events evio.Events events.NumLoops = loops events.Serving = func(srv evio.Server) (action evio.Action) { - log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops) if reuseport { log.Printf("reuseport") } diff --git a/benchmarks/eviop-echo-server/main.go b/benchmarks/eviop-echo-server/main.go index 063176d..c3924db 100644 --- a/benchmarks/eviop-echo-server/main.go +++ b/benchmarks/eviop-echo-server/main.go @@ -34,7 +34,6 @@ func main() { var events eviop.Events events.NumLoops = loops events.Serving = func(srv eviop.Server) (action eviop.Action) { - log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops) if reuseport { log.Printf("reuseport") } diff --git a/benchmarks/gev-echo-server/echo.go b/benchmarks/gev-echo-server/echo.go index 0e9ac63..5cbe88a 100644 --- a/benchmarks/gev-echo-server/echo.go +++ b/benchmarks/gev-echo-server/echo.go @@ -5,6 +5,8 @@ import ( "net/http" "strconv" + "github.com/Allenxuxu/gev/log" + _ "net/http/pprof" "github.com/Allenxuxu/gev" @@ -16,15 +18,26 @@ type example struct { func (s *example) OnConnect(c *connection.Connection) {} func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { + out = data + + //msg := append([]byte{}, data...) + //go func() { + // if err := c.Send(msg); err != nil { + // //log.Errorf("send error :%v", err) + // } + //}() return } -func (s *example) OnClose(c *connection.Connection) {} +func (s *example) OnClose(c *connection.Connection) { + //log.Error("onclose ") +} func main() { + log.SetLevel(log.LevelDebug) go func() { - if err := http.ListenAndServe(":6061", nil); err != nil { + if err := http.ListenAndServe(":6089", nil); err != nil { panic(err) } }() diff --git a/benchmarks/gnet-echo-server/main.go b/benchmarks/gnet-echo-server/main.go index 8cf6c32..94535de 100644 --- a/benchmarks/gnet-echo-server/main.go +++ b/benchmarks/gnet-echo-server/main.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "net/http" - _ "net/http/pprof" "github.com/panjf2000/gnet" @@ -16,25 +15,20 @@ type echoServer struct { } func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) { - log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n", - srv.Addr.String(), srv.Multicore, srv.NumEventLoop) return } func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { // Echo synchronously. out = frame - return - /* - // Echo asynchronously. - data := append([]byte{}, frame...) - go func() { - time.Sleep(time.Second) - c.AsyncWrite(data) - }() - return - */ + //// Echo asynchronously. + //data := append([]byte{}, frame...) + //go func() { + // //time.Sleep(time.Second) + // c.AsyncWrite(data) + //}() + return } func main() { diff --git a/benchmarks/websocket/client/main.go b/benchmarks/websocket/client/main.go new file mode 100644 index 0000000..e933bc9 --- /dev/null +++ b/benchmarks/websocket/client/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "bytes" + "errors" + "flag" + "fmt" + "math/rand" + "time" + + "golang.org/x/net/websocket" +) + +var ( + addr = flag.String("a", "localhost:1833", "address") + num = flag.Int("c", 1, "connection number") + timeOut = flag.Int("t", 2, "timeout second") + msgLen = flag.Int("m", 1024, "message length") + + msg []byte +) + +func main() { + flag.Parse() + fmt.Printf("*** %d connections, %d seconds, %d byte packets ***\n", *num, *timeOut, *msgLen) + + msg = make([]byte, *msgLen) + rand.Read(msg) + startC := make(chan interface{}) + closeC := make(chan interface{}) + result := make(chan int64, *num) + req := make(chan int64, *num) + + for i := 0; i < *num; i++ { + go startWebSocketClient(startC, closeC, result, req) + } + + // start + close(startC) + time.Sleep(time.Duration(*timeOut) * time.Second) + // stop + close(closeC) + + var totalMessagesRead, reqCount int64 + for i := 0; i < *num; i++ { + totalMessagesRead += <-result + reqCount += <-req + } + + fmt.Println(totalMessagesRead/int64(*timeOut*1024*1024), " MiB/s throughput") + fmt.Println(reqCount/int64(*timeOut), " qps") +} + +func startWebSocketClient(startC chan interface{}, closeC chan interface{}, result, req chan int64) { + var count, reqCount int64 + buf := make([]byte, 2*(*msgLen)) + + address := "ws://" + *addr + c, err := websocket.Dial(address, "", address) + if err != nil { + panic(err) + } + c.MaxPayloadBytes = *msgLen * 2 + <-startC + + if n, err := c.Write(msg); err != nil || n != len(msg) { + panic(err) + } + + for { + select { + case <-closeC: + result <- count + req <- reqCount + c.Close() + return + default: + n, err := c.Read(buf) + if err != nil || n != len(msg) { + fmt.Printf("read error %v %d", err, n) + panic(errors.New("read error")) + } + if !bytes.Equal(msg, buf[:n]) { + panic("mismatch") + } + + count += int64(n) + reqCount++ + + _, err = c.Write(msg) + if err != nil { + fmt.Println("Error to send message because of ", err.Error()) + } + } + } +} diff --git a/benchmarks/websocket/server.go b/benchmarks/websocket/server.go new file mode 100644 index 0000000..295ae01 --- /dev/null +++ b/benchmarks/websocket/server.go @@ -0,0 +1,72 @@ +package main + +import ( + "flag" + "strconv" + + "github.com/Allenxuxu/gev/log" + + "github.com/Allenxuxu/gev" + "github.com/Allenxuxu/gev/connection" + "github.com/Allenxuxu/gev/plugins/websocket" + "github.com/Allenxuxu/gev/plugins/websocket/ws" +) + +type example struct { +} + +// connection lifecycle +// OnConnect() -> OnRequest() -> OnHeader() -> OnMessage() -> OnClose() + +func (s *example) OnConnect(c *connection.Connection) { + //log.Println("OnConnect: ", c.PeerAddr()) +} + +func (s *example) OnMessage(c *connection.Connection, data []byte) (messageType ws.MessageType, out []byte) { + //log.Println("OnMessage: ", string(data)) + + messageType = ws.MessageBinary + out = data + + return +} + +func (s *example) OnClose(c *connection.Connection) { + //log.Println("123 OnClose", c.PeerAddr()) +} + +// NewWebSocketServer 创建 WebSocket Server +func NewWebSocketServer(handler websocket.WSHandler, u *ws.Upgrader, opts ...gev.Option) (server *gev.Server, err error) { + opts = append(opts, gev.Protocol(websocket.New(u))) + return gev.NewServer(websocket.NewHandlerWrap(u, handler), opts...) +} + +func main() { + log.SetLevel(log.LevelDebug) + var ( + port int + loops int + ) + + flag.IntVar(&port, "port", 1833, "server port") + flag.IntVar(&loops, "loops", -1, "num loops") + flag.Parse() + + handler := &example{} + wsUpgrader := &ws.Upgrader{} + //wsUpgrader.OnRequest = func(c *connection.Connection, uri []byte) error { + // log.Println("OnRequest: ", string(uri)) + // + // return nil + //} + + s, err := NewWebSocketServer(handler, wsUpgrader, + gev.Network("tcp"), + gev.Address(":"+strconv.Itoa(port)), + gev.NumLoops(loops)) + if err != nil { + panic(err) + } + + s.Start() +} diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go index f642020..6fe85f9 100644 --- a/eventloop/eventloop.go +++ b/eventloop/eventloop.go @@ -26,13 +26,15 @@ type EventLoop struct { // nolint type eventLoopLocal struct { - eventHandling atomic.Bool - poll *poller.Poller - mu spinlock.SpinLock - sockets map[int]Socket - packet []byte - pendingFunc []func() - UserBuffer *[]byte + needWake *atomic.Bool + poll *poller.Poller + mu spinlock.SpinLock + sockets map[int]Socket + packet []byte + taskQueueW []func() + taskQueueR []func() + + UserBuffer *[]byte } // New 创建一个 EventLoop @@ -49,6 +51,9 @@ func New() (*EventLoop, error) { packet: make([]byte, 0xFFFF), sockets: make(map[int]Socket), UserBuffer: &userBuffer, + needWake: atomic.New(true), + taskQueueW: make([]func(), 0, 1024), + taskQueueR: make([]func(), 0, 1024), }, }, nil } @@ -110,10 +115,10 @@ func (l *EventLoop) Stop() error { // QueueInLoop 添加 func 到事件循环中执行 func (l *EventLoop) QueueInLoop(f func()) { l.mu.Lock() - l.pendingFunc = append(l.pendingFunc, f) + l.taskQueueW = append(l.taskQueueW, f) l.mu.Unlock() - if !l.eventHandling.Get() { + if l.needWake.CompareAndSwap(true, false) { if err := l.poll.Wake(); err != nil { log.Error("QueueInLoop Wake loop, ", err) } @@ -121,29 +126,26 @@ func (l *EventLoop) QueueInLoop(f func()) { } func (l *EventLoop) handlerEvent(fd int, events poller.Event) { - l.eventHandling.Set(true) - if fd != -1 { s, ok := l.sockets[fd] if ok { s.HandleEvent(fd, events) } + } else { + l.needWake.Set(true) + l.doPendingFunc() } - - l.eventHandling.Set(false) - - l.doPendingFunc() } func (l *EventLoop) doPendingFunc() { l.mu.Lock() - pf := l.pendingFunc - l.pendingFunc = nil + l.taskQueueW, l.taskQueueR = l.taskQueueR, l.taskQueueW l.mu.Unlock() - length := len(pf) - + length := len(l.taskQueueR) for i := 0; i < length; i++ { - pf[i]() + l.taskQueueR[i]() } + + l.taskQueueR = l.taskQueueR[:0] } diff --git a/go.mod b/go.mod index 0a7b095..46da092 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.14 require ( github.com/Allenxuxu/eviop v0.0.0-20190901123806-035c218f739a github.com/Allenxuxu/ringbuffer v0.0.9 - github.com/Allenxuxu/toolkit v0.0.0-20200827004847-cb9a6f0d072f + github.com/Allenxuxu/toolkit v0.0.1 github.com/RussellLuo/timingwheel v0.0.0-20201029015908-64de9d088c74 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee github.com/gobwas/pool v0.2.0 diff --git a/go.sum b/go.sum index 07c976e..d3186db 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/Allenxuxu/eviop v0.0.0-20190901123806-035c218f739a/go.mod h1:I5+IvzRy github.com/Allenxuxu/ringbuffer v0.0.0-20190803184500-fa400f2fe92b/go.mod h1:9Rg4D7ixiHGlU50BJWJEg6vwDFcGiOYKQFcHK6Vx9m4= github.com/Allenxuxu/ringbuffer v0.0.9 h1:KxG2kqM1iCL0VqTTuQS6XqugoffP8IM7YhVnzEPf8cc= github.com/Allenxuxu/ringbuffer v0.0.9/go.mod h1:F2Ela+/miJmKYwnXr3X0+spOmSEwL/iFAEzeUJ4SFMI= -github.com/Allenxuxu/toolkit v0.0.0-20200827004847-cb9a6f0d072f h1:DqHu2M6MxT+R6KN2UdsWjcZ6Yhw4ENSKYlXTKZ24P+c= -github.com/Allenxuxu/toolkit v0.0.0-20200827004847-cb9a6f0d072f/go.mod h1:kamv5tj0iNT29zmKIYaxoIcYgDnzerxnOZiHBKbVp/o= +github.com/Allenxuxu/toolkit v0.0.1 h1:xY4AK/nmjxQC1sVbolUUqVeH27+TalfCPLd85y2VfS0= +github.com/Allenxuxu/toolkit v0.0.1/go.mod h1:kamv5tj0iNT29zmKIYaxoIcYgDnzerxnOZiHBKbVp/o= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/server_test.go b/server_test.go index 813e0d2..3607a59 100644 --- a/server_test.go +++ b/server_test.go @@ -28,7 +28,8 @@ func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []by //log.Println("OnMessage") //out = data - if err := c.Send(data); err != nil { + msg := append([]byte{}, data...) + if err := c.Send(msg); err != nil { panic(err) } return