diff --git a/client_test.go b/client_test.go index e91a2213c..a47ca762a 100644 --- a/client_test.go +++ b/client_test.go @@ -471,57 +471,6 @@ func TestServeWithGnetClient(t *testing.T) { }) }) }) - - t.Run("poll-reuseaddr", func(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections) - }) - }) - t.Run("tcp-async", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections) - }) - }) - t.Run("udp", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections) - }) - }) - t.Run("udp-async", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections) - }) - }) - t.Run("unix", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections) - }) - }) - t.Run("unix-async", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin) - }) - t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections) - }) - }) - }) } type testClientServer struct { diff --git a/connection_unix.go b/connection_unix.go index 6443b3299..6b5174e05 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -57,8 +57,8 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr codec: codec, localAddr: localAddr, remoteAddr: remoteAddr, - inboundBuffer: rbPool.GetWithSize(ringbuffer.TCPReadBufferSize), - outboundBuffer: mixedbuffer.New(mixedbuffer.MaxStackingBytes), + inboundBuffer: rbPool.GetWithSize(ringbuffer.MaxStreamBufferCap), + outboundBuffer: mixedbuffer.New(ringbuffer.MaxStreamBufferCap), } c.pollAttachment = netpoll.GetPollAttachment() c.pollAttachment.FD, c.pollAttachment.Callback = fd, c.handleEvents @@ -201,8 +201,8 @@ func (c *conn) writev(bs [][]byte) (err error) { for i := range c.packets { np := len(c.packets[i]) if n < np { - pos = i c.packets[i] = c.packets[i][n:] + pos = i break } n -= np diff --git a/gnet.go b/gnet.go index abcf126f1..a2914a2cf 100644 --- a/gnet.go +++ b/gnet.go @@ -290,12 +290,12 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err rbc := options.ReadBufferCap switch { case rbc <= 0: - options.ReadBufferCap = ringbuffer.TCPReadBufferSize + options.ReadBufferCap = ringbuffer.MaxStreamBufferCap case rbc <= ringbuffer.DefaultBufferSize: options.ReadBufferCap = ringbuffer.DefaultBufferSize default: options.ReadBufferCap = toolkit.CeilToPowerOfTwo(rbc) - ringbuffer.TCPReadBufferSize = options.ReadBufferCap + ringbuffer.MaxStreamBufferCap = options.ReadBufferCap } network, addr := parseProtoAddr(protoAddr) diff --git a/gnet_test.go b/gnet_test.go index 13e7a1afd..802ad1b15 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -390,50 +390,66 @@ func TestServe(t *testing.T) { t.Run("poll-reuseport", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin) + testServe(t, "tcp", ":9991", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections) + testServe(t, "tcp", ":9992", true, false, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin) + testServe(t, "tcp", ":9991", true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections) + testServe(t, "tcp", ":9992", true, false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + testServe(t, "tcp", ":9991", true, false, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + testServe(t, "tcp", ":9992", true, false, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin) + testServe(t, "udp", ":9991", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections) + testServe(t, "udp", ":9992", true, false, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin) + testServe(t, "udp", ":9991", true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections) + testServe(t, "udp", ":9992", true, false, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin) + testServe(t, "unix", "gnet1.sock", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections) + testServe(t, "unix", "gnet2.sock", true, false, true, false, false, 10, LeastConnections) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin) + testServe(t, "unix", "gnet1.sock", true, false, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + testServe(t, "unix", "gnet2.sock", true, false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + testServe(t, "unix", "gnet1.sock", true, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections) + testServe(t, "unix", "gnet2.sock", true, false, true, true, true, 10, LeastConnections) }) }) }) diff --git a/pkg/mixedbuffer/mixed_ring_list_buffer.go b/pkg/mixedbuffer/mixed_ring_list_buffer.go index 4f318eaec..8d7cbacbb 100644 --- a/pkg/mixedbuffer/mixed_ring_list_buffer.go +++ b/pkg/mixedbuffer/mixed_ring_list_buffer.go @@ -20,9 +20,6 @@ import ( "github.com/panjf2000/gnet/pkg/ringbuffer" ) -// MaxStackingBytes is the maximum amount which is allowed to be piled up in the ring-buffer. -const MaxStackingBytes = 32 * 1024 // 32KB - // Buffer combines ring-buffer and list-buffer. // Ring-buffer is the top-priority buffer to store response data, gnet will only switch to // list-buffer if the data size of ring-buffer reaches the maximum(MaxStackingBytes), list-buffer is more @@ -35,7 +32,7 @@ type Buffer struct { // New instantiates a mixedbuffer.Buffer and returns it. func New(maxTopBufCap int) *Buffer { - return &Buffer{maxStackingBytes: maxTopBufCap, ringBuffer: rbPool.Get()} + return &Buffer{maxStackingBytes: maxTopBufCap, ringBuffer: rbPool.GetWithSize(maxTopBufCap)} } // Peek returns all bytes as [][]byte, these bytes won't be discarded until Buffer.Discard() is called. @@ -61,31 +58,43 @@ func (mb *Buffer) Write(p []byte) (n int, err error) { mb.listBuffer.PushBytesBack(p) return len(p), nil } - n, err = mb.ringBuffer.Write(p) - if n > mb.maxStackingBytes { - mb.maxStackingBytes = n + freeSize := mb.ringBuffer.Free() + if len(p) > freeSize { + n, err = mb.ringBuffer.Write(p[:freeSize]) + mb.listBuffer.PushBytesBack(p[n:]) + return } - return + return mb.ringBuffer.Write(p) } // Writev appends multiple byte slices to this buffer. func (mb *Buffer) Writev(bs [][]byte) (int, error) { - var n int if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= mb.maxStackingBytes { + var n int for _, b := range bs { mb.listBuffer.PushBytesBack(b) n += len(b) } return n, nil } - for _, b := range bs { - _, _ = mb.ringBuffer.Write(b) - n += len(b) + var pos, sum int + freeSize := mb.ringBuffer.Free() + for i, b := range bs { + pos = i + sum += len(b) + if len(b) > freeSize { + n, _ := mb.ringBuffer.Write(b[:freeSize]) + mb.listBuffer.PushBytesBack(b[n:]) + break + } + n, _ := mb.ringBuffer.Write(b) + freeSize -= n } - if n > mb.maxStackingBytes { - mb.maxStackingBytes = n + for pos++; pos < len(bs); pos++ { + sum += len(bs[pos]) + mb.listBuffer.PushBytesBack(bs[pos]) } - return n, nil + return sum, nil } // IsEmpty indicates whether this buffer is empty. diff --git a/pkg/pool/ringbuffer/ringbuffer.go b/pkg/pool/ringbuffer/ringbuffer.go index 33c82a7a6..6e44ab459 100644 --- a/pkg/pool/ringbuffer/ringbuffer.go +++ b/pkg/pool/ringbuffer/ringbuffer.go @@ -83,7 +83,7 @@ func (p *Pool) GetWithSize(size int) *RingBuffer { v := p.pool.Get() if v != nil { rb := v.(*RingBuffer) - if rb.Cap() >= size { + if rb.Len() >= size { return rb } p.pool.Put(v) diff --git a/pkg/ringbuffer/ring_buffer.go b/pkg/ringbuffer/ring_buffer.go index 441424c08..1db2fe34a 100644 --- a/pkg/ringbuffer/ring_buffer.go +++ b/pkg/ringbuffer/ring_buffer.go @@ -32,8 +32,8 @@ const ( bufferGrowThreshold = 4 * 1024 // 4KB ) -// TCPReadBufferSize is the default read buffer size for each TCP socket. -var TCPReadBufferSize = 64 * 1024 // 64KB +// MaxStreamBufferCap is the default buffer size for each stream-oriented connection(TCP/Unix). +var MaxStreamBufferCap = 64 * 1024 // 64KB // ErrIsEmpty will be returned when trying to read an empty ring-buffer. var ErrIsEmpty = errors.New("ring-buffer is empty")