Skip to content

Commit

Permalink
feat: expose config and features control
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jul 22, 2024
1 parent 1a975d1 commit d9766f0
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 144 deletions.
4 changes: 2 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

Expand Down
85 changes: 76 additions & 9 deletions netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,75 @@ package netpoll
import (
"context"
"io"
"log"
"os"
"runtime"
"time"
)

var (
pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers
logger = log.New(os.Stderr, "", log.LstdFlags)

// global config
defaultLinkBufferSize = pagesize
featureAlwaysNoCopyRead = false
)

// Config expose some tuning parameters to control the internal behaviors of netpoll.
// Every parameter with the default zero value should keep the default behavior of netpoll.
type Config struct {
PollerNum int // number of pollers
BufferSize int // default size of a new connection's LinkBuffer
Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool.
LoggerOutput io.Writer // logger output
LoadBalance LoadBalance // load balance for poller picker
Feature // define all features that not enable by default
}

// Feature expose some new features maybe promoted as a default behavior but not yet.
type Feature struct {
// AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString
// will use NoCopy read and will not reuse the underlying buffer.
// It gains more performance benefits when need read much big string/bytes in codec.
AlwaysNoCopyRead bool
}

// Configure the internal behaviors of netpoll.
// Configure must called in init() function, because the poller will read some global variable after init() finished
func Configure(config Config) (err error) {
if config.PollerNum > 0 {
if err = pollmanager.SetNumLoops(config.PollerNum); err != nil {
return err
}
}
if config.BufferSize > 0 {
defaultLinkBufferSize = config.BufferSize
}

if config.Runner != nil {
setRunner(config.Runner)
}
if config.LoggerOutput != nil {
logger = log.New(config.LoggerOutput, "", log.LstdFlags)
}
if config.LoadBalance >= 0 {
if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil {
return err
}
}

featureAlwaysNoCopyRead = config.AlwaysNoCopyRead
return nil
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
// The first call of Pick() will init pollers
_ = pollmanager.Pick()
}

// SetNumLoops is used to set the number of pollers, generally do not need to actively set.
// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1.
// If the number of cores in your service process is less than 20c, theoretically only one poller is needed.
Expand All @@ -34,28 +100,28 @@ import (
// func init() {
// netpoll.SetNumLoops(...)
// }
//
// Deprecated: use Configure instead.
func SetNumLoops(numLoops int) error {
return setNumLoops(numLoops)
return pollmanager.SetNumLoops(numLoops)
}

// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when numLoops is set.
// Deprecated: use Configure instead.
func SetLoadBalance(lb LoadBalance) error {
return setLoadBalance(lb)
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
initialize()
return pollmanager.SetLoadBalance(lb)
}

// SetLoggerOutput sets the logger output target.
// Deprecated: use Configure instead.
func SetLoggerOutput(w io.Writer) {
setLoggerOutput(w)
logger = log.New(w, "", log.LstdFlags)
}

// SetRunner set the runner function for every OnRequest/OnConnect callback
// Deprecated: use Configure instead.
func SetRunner(f func(ctx context.Context, f func())) {
setRunner(f)
}
Expand All @@ -65,6 +131,7 @@ func SetRunner(f func(ctx context.Context, f func())) {
// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine.
// But if you can confirm that the OnRequest will not cause stack expansion,
// it is recommended to use DisableGopool to reduce redundancy and improve performance.
// Deprecated: use Configure instead.
func DisableGopool() error {
return disableGopool()
}
Expand Down
26 changes: 13 additions & 13 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,19 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) {
// single node
if b.isSingleNode(n) {
// TODO: enable nocopy read mode when ensure no legacy depend on copy-read
//// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
//if !b.read.getMode(readonlyMask) {
// // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// // no need to malloc 10 times and the string slice could have the compact memory allocation.
// if b.read.getMode(nocopyReadMask) {
// return b.read.Next(n)
// }
// if n >= minReuseBytes && cap(b.read.buf) <= block32k {
// b.read.setMode(nocopyReadMask, true)
// return b.read.Next(n)
// }
//}
// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
if !b.read.getMode(readonlyMask) {
// if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// no need to malloc 10 times and the string slice could have the compact memory allocation.
if b.read.getMode(nocopyReadMask) {
return b.read.Next(n)
}
if featureAlwaysNoCopyRead && n >= minReuseBytes {

Check failure on line 263 in nocopy_linkbuffer.go

View workflow job for this annotation

GitHub Actions / windows-test

undefined: featureAlwaysNoCopyRead

Check failure on line 263 in nocopy_linkbuffer.go

View workflow job for this annotation

GitHub Actions / windows-test

undefined: featureAlwaysNoCopyRead
b.read.setMode(nocopyReadMask, true)
return b.read.Next(n)
}
}
// if the underlying buffer too large, we shouldn't use no-copy mode
p = dirtmake.Bytes(n, n)
copy(p, b.read.Next(n))
Expand Down
172 changes: 87 additions & 85 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"sync/atomic"
"testing"
)
Expand Down Expand Up @@ -522,91 +523,92 @@ func TestLinkBufferWriteDirect(t *testing.T) {
}
}

//func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
// // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
// const (
// mallocLen = 4096 * 2
// originLen = 4096
// dataLen = 512
// newLen = 16
// normalLen = 4096
// )
// buf := NewLinkBuffer()
// bt, _ := buf.Malloc(mallocLen)
// originBuf := bt[:originLen]
// newBuf := bt[originLen : originLen+newLen]
//
// // write origin_node
// for i := 0; i < originLen; i++ {
// bt[i] = 'a'
// }
// // write data_node
// userBuf := make([]byte, dataLen)
// for i := 0; i < len(userBuf); i++ {
// userBuf[i] = 'b'
// }
// buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// // write new_node
// for i := 0; i < newLen; i++ {
// bt[originLen+i] = 'c'
// }
// buf.MallocAck(originLen + dataLen + newLen)
// buf.Flush()
// // write normal_node
// normalBuf, _ := buf.Malloc(normalLen)
// for i := 0; i < normalLen; i++ {
// normalBuf[i] = 'd'
// }
// buf.Flush()
// Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)
//
// // copy read origin_node
// bt, _ = buf.ReadBinary(originLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'a')
// }
// MustTrue(t, &bt[0] != &originBuf[0])
// // next read node is data node and must be readonly and non-reusable
// MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// // copy read data_node
// bt, _ = buf.ReadBinary(dataLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'b')
// }
// MustTrue(t, &bt[0] != &userBuf[0])
// // copy read new_node
// bt, _ = buf.ReadBinary(newLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'c')
// }
// MustTrue(t, &bt[0] != &newBuf[0])
// // current read node is the new node and must not be reusable
// newnode := buf.read
// t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
// MustTrue(t, newnode.reusable())
// var nodeReleased int32
// runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// // nocopy read normal_node
// bt, _ = buf.ReadBinary(normalLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'd')
// }
// MustTrue(t, &bt[0] == &normalBuf[0])
// // normal buffer never should be released
// runtime.SetFinalizer(&bt[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// _ = buf.Release()
// MustTrue(t, newnode.buf == nil)
// for atomic.LoadInt32(&nodeReleased) == 0 {
// runtime.GC()
// t.Log("newnode release checking")
// }
// Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
// runtime.KeepAlive(normalBuf)
//}
func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}})
// [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
const (
mallocLen = 4096 * 2
originLen = 4096
dataLen = 512
newLen = 16
normalLen = 4096
)
buf := NewLinkBuffer()
bt, _ := buf.Malloc(mallocLen)
originBuf := bt[:originLen]
newBuf := bt[originLen : originLen+newLen]

// write origin_node
for i := 0; i < originLen; i++ {
bt[i] = 'a'
}
// write data_node
userBuf := make([]byte, dataLen)
for i := 0; i < len(userBuf); i++ {
userBuf[i] = 'b'
}
buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// write new_node
for i := 0; i < newLen; i++ {
bt[originLen+i] = 'c'
}
buf.MallocAck(originLen + dataLen + newLen)
buf.Flush()
// write normal_node
normalBuf, _ := buf.Malloc(normalLen)
for i := 0; i < normalLen; i++ {
normalBuf[i] = 'd'
}
buf.Flush()
Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)

// copy read origin_node
bt, _ = buf.ReadBinary(originLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'a')
}
MustTrue(t, &bt[0] != &originBuf[0])
// next read node is data node and must be readonly and non-reusable
MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// copy read data_node
bt, _ = buf.ReadBinary(dataLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'b')
}
MustTrue(t, &bt[0] != &userBuf[0])
// copy read new_node
bt, _ = buf.ReadBinary(newLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'c')
}
MustTrue(t, &bt[0] != &newBuf[0])
// current read node is the new node and must not be reusable
newnode := buf.read
t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
MustTrue(t, newnode.reusable())
var nodeReleased int32
runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
// nocopy read normal_node
bt, _ = buf.ReadBinary(normalLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'd')
}
MustTrue(t, &bt[0] == &normalBuf[0])
// normal buffer never should be released
runtime.SetFinalizer(&bt[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
_ = buf.Release()
MustTrue(t, newnode.buf == nil)
for atomic.LoadInt32(&nodeReleased) == 0 {
runtime.GC()
t.Log("newnode release checking")
}
Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
runtime.KeepAlive(normalBuf)
}

func TestLinkBufferBufferMode(t *testing.T) {
bufnode := newLinkBufferNode(0)
Expand Down
12 changes: 6 additions & 6 deletions poll_loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ import (
type LoadBalance int

const (
// Random requests that connections are randomly distributed.
Random LoadBalance = iota
// RoundRobin requests that connections are distributed to a Poll
// in a round-robin fashion.
RoundRobin
RoundRobin LoadBalance = iota
// Random requests that connections are randomly distributed.
Random
)

// loadbalance sets the load balancing method for []*polls
type loadbalance interface {
LoadBalance() LoadBalance
// Choose the most qualified Poll
// Pick choose the most qualified Poll
Pick() (poll Poll)

Rebalance(polls []Poll)
}

func newLoadbalance(lb LoadBalance, polls []Poll) loadbalance {
switch lb {
case Random:
return newRandomLB(polls)
case RoundRobin:
return newRoundRobinLB(polls)
case Random:
return newRandomLB(polls)
}
return newRoundRobinLB(polls)
}
Expand Down
Loading

0 comments on commit d9766f0

Please sign in to comment.