From d61c79f7a197170f6d87225a17b1c51447932af8 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Fri, 14 Oct 2022 18:03:05 +0800 Subject: [PATCH 1/4] perf: use local p cache for buffer --- nocopy_linkbuffer.go | 6 +- pcache.go | 136 +++++++++++++++++++++++++++++++++++++++++++ pcache_test.go | 63 ++++++++++++++++++++ 3 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 pcache.go create mode 100644 pcache_test.go diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index fd861b4c..a1fd1438 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -25,8 +25,6 @@ import ( "sync" "sync/atomic" "unsafe" - - "github.com/bytedance/gopkg/lang/mcache" ) // BinaryInplaceThreshold marks the minimum value of the nocopy slice length, @@ -821,7 +819,7 @@ func malloc(size, capacity int) []byte { if capacity > mallocMax { return make([]byte, size, capacity) } - return mcache.Malloc(size, capacity) + return Malloc(size, capacity) } // free limits the cap of the buffer from mcache. @@ -829,5 +827,5 @@ func free(buf []byte) { if cap(buf) > mallocMax { return } - mcache.Free(buf) + Free(buf) } diff --git a/pcache.go b/pcache.go new file mode 100644 index 00000000..5ded9fd7 --- /dev/null +++ b/pcache.go @@ -0,0 +1,136 @@ +package netpoll + +import ( + "math/bits" + "runtime" + _ "unsafe" +) + +//go:linkname procPin runtime.procPin +func procPin() int + +//go:linkname procUnpin runtime.procUnpin +func procUnpin() int + +var globalPCache = newPCache() + +func Malloc(size int, capacity ...int) []byte { + return globalPCache.Malloc(size, capacity...) +} + +func Free(buf []byte) { + globalPCache.Free(buf) +} + +const ( + defaultPCacheMaxSize = 42 // 2^42=4 TB + defaultPCacheLimitPerP = 1024 * 1024 * 64 // 64 MB + defaultPCacheCleanCycles = 3 // clean inactive every 3 gc cycles +) + +type pcache struct { + active [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx][]byte + activeSize []int64 // [pid]int64, every P's active size + activeLimit int64 // active_limit_size = total_limit / GOMAXPROCS + inactive [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx][]byte + ref *pcacheRef +} + +type pcacheRef struct { + pc *pcache + gc int +} + +func gcRefHandler(ref *pcacheRef) { + ref.gc++ + // trigger handler every gc cycle + if ref.gc >= defaultPCacheCleanCycles { + ref.gc = 0 + pid := procPin() + var l int + for i := 0; i < defaultPCacheMaxSize; i++ { + l = len(ref.pc.inactive[pid][i]) + if l == 0 { + continue + } + ref.pc.inactive[pid][i] = ref.pc.inactive[pid][i][:l/2] + } + procUnpin() + } + runtime.SetFinalizer(ref, gcRefHandler) +} + +func newPCache() *pcache { + return newLimitedPCache(defaultPCacheLimitPerP) +} + +func newLimitedPCache(limitPerP int64) *pcache { + procs := runtime.GOMAXPROCS(0) + pc := &pcache{ + active: make([][defaultPCacheMaxSize][][]byte, procs), + activeSize: make([]int64, procs), + inactive: make([][defaultPCacheMaxSize][][]byte, procs), + activeLimit: limitPerP, + } + pc.ref = &pcacheRef{pc: pc} + runtime.SetFinalizer(pc.ref, gcRefHandler) + pc.ref = nil // trigger gc + return pc +} + +func (p *pcache) Malloc(size int, _capacity ...int) (data []byte) { + var capacity = size + if len(_capacity) > 0 && _capacity[0] > size { + capacity = _capacity[0] + } + cidx := calcCapIndex(capacity) + capacity = 1 << cidx + + pid := procPin() + if len(p.active[pid][cidx]) > 0 { + data = p.active[pid][cidx][0] + p.active[pid][cidx] = p.active[pid][cidx][1:] + p.activeSize[pid] -= int64(capacity) + } else if len(p.inactive[pid][cidx]) > 0 { + data = p.inactive[pid][cidx][0] + p.inactive[pid][cidx] = p.inactive[pid][cidx][1:] + } else { + data = make([]byte, size, capacity) + } + procUnpin() + return data[:size] +} + +func (p *pcache) Free(data []byte) { + capacity := cap(data) + cidx := calcCapIndex(capacity) + data = data[:0] + + pid := procPin() + // if out of active limit, put into inactive + if p.activeLimit > 0 && p.activeSize[pid] >= p.activeLimit { + p.inactive[pid][cidx] = append(p.inactive[pid][cidx], data) + } else { + p.activeSize[pid] += int64(capacity) + p.active[pid][cidx] = append(p.active[pid][cidx], data) + } + procUnpin() +} + +func calcCapIndex(size int) int { + if size == 0 { + return 0 + } + if isPowerOfTwo(size) { + return bsr(size) + } + return bsr(size) + 1 +} + +func bsr(x int) int { + return bits.Len(uint(x)) - 1 +} + +func isPowerOfTwo(x int) bool { + return (x & (-x)) == x +} diff --git a/pcache_test.go b/pcache_test.go new file mode 100644 index 00000000..0f0b0638 --- /dev/null +++ b/pcache_test.go @@ -0,0 +1,63 @@ +package netpoll + +import ( + "runtime" + "testing" +) + +func TestPCache(t *testing.T) { + procs := runtime.GOMAXPROCS(0) + runtime.GOMAXPROCS(1) + defer runtime.GOMAXPROCS(procs) + Equal(t, runtime.GOMAXPROCS(0), 1) + + pc := newPCache() + buf := pc.Malloc(1024, 1025) + Equal(t, len(buf), 1024) + Equal(t, cap(buf), 2048) + pc.Free(buf) + Equal(t, len(pc.active[0][calcCapIndex(1025)]), 1) + + buf = pc.Malloc(1024, 1024) + Equal(t, len(buf), 1024) + Equal(t, cap(buf), 1024) + pc.Free(buf) + Equal(t, len(pc.active[0][calcCapIndex(1025)]), 1) + Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) +} + +func TestPCacheGC(t *testing.T) { + procs := runtime.GOMAXPROCS(0) + runtime.GOMAXPROCS(1) + defer runtime.GOMAXPROCS(procs) + Equal(t, runtime.GOMAXPROCS(0), 1) + + pc := newLimitedPCache(1024) + buf1 := pc.Malloc(1024, 1024) + buf2 := pc.Malloc(1024, 1024) + Equal(t, len(buf1), 1024) + Equal(t, cap(buf1), 1024) + Equal(t, len(buf2), 1024) + Equal(t, cap(buf2), 1024) + pc.Free(buf1) + pc.Free(buf2) + Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) + Equal(t, len(pc.inactive[0][calcCapIndex(1024)]), 1) + + for i := 0; i < defaultPCacheCleanCycles; i++ { + runtime.GC() + } + Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) + Equal(t, len(pc.inactive[0][calcCapIndex(1024)]), 0) + buf1 = pc.Malloc(1024, 1024) + buf2 = pc.Malloc(1024, 1024) + pc.Free(buf1) + pc.Free(buf2) + Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) + Equal(t, len(pc.inactive[0][calcCapIndex(1024)]), 1) + for i := 0; i < defaultPCacheCleanCycles; i++ { + runtime.GC() + } + Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) + Equal(t, len(pc.inactive[0][calcCapIndex(1024)]), 0) +} From 13a4b2b469f17cc2eb7e64f8405d1f06b8e1aa8d Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 18 Oct 2022 14:32:50 +0800 Subject: [PATCH 2/4] perf: use fixed arena --- pcache.go | 160 +++++++++++++++++++++++++++++++++++------------- pcache_arena.go | 14 +++++ pcache_test.go | 81 +++++++++++++++++++++++- 3 files changed, 209 insertions(+), 46 deletions(-) create mode 100644 pcache_arena.go diff --git a/pcache.go b/pcache.go index 5ded9fd7..90819583 100644 --- a/pcache.go +++ b/pcache.go @@ -1,8 +1,11 @@ package netpoll import ( + "log" "math/bits" + "reflect" "runtime" + "unsafe" _ "unsafe" ) @@ -23,17 +26,21 @@ func Free(buf []byte) { } const ( - defaultPCacheMaxSize = 42 // 2^42=4 TB - defaultPCacheLimitPerP = 1024 * 1024 * 64 // 64 MB - defaultPCacheCleanCycles = 3 // clean inactive every 3 gc cycles + debug = false + defaultPCacheMaxSize = 42 // 2^42=4 TB + defaultPCacheActiveLimitPerP = 1024 * 1024 * 32 + defaultPCacheInactiveLimitPerP = 1024 * 1024 * 16 + defaultPCacheCleanCycles = 3 ) type pcache struct { - active [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx][]byte - activeSize []int64 // [pid]int64, every P's active size - activeLimit int64 // active_limit_size = total_limit / GOMAXPROCS - inactive [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx][]byte - ref *pcacheRef + arena []byte + arenaStart uintptr // arena address start + arenaEnd uintptr // arena address end + blocks [][]byte // [pid][]byte, pid=(addr-addr_start)/block_size + active [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack + inactive [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack + ref *pcacheRef } type pcacheRef struct { @@ -42,36 +49,63 @@ type pcacheRef struct { } func gcRefHandler(ref *pcacheRef) { + defer runtime.SetFinalizer(ref, gcRefHandler) ref.gc++ - // trigger handler every gc cycle - if ref.gc >= defaultPCacheCleanCycles { - ref.gc = 0 - pid := procPin() - var l int - for i := 0; i < defaultPCacheMaxSize; i++ { - l = len(ref.pc.inactive[pid][i]) - if l == 0 { - continue - } - ref.pc.inactive[pid][i] = ref.pc.inactive[pid][i][:l/2] - } + if ref.gc < defaultPCacheCleanCycles { + return + } + ref.gc = 0 + + // trigger handler + pid := procPin() + var cached int + for i := 0; i < defaultPCacheMaxSize; i++ { + cached += (1 << i) * len(ref.pc.inactive[pid][i]) + } + if cached == 0 || cached < defaultPCacheInactiveLimitPerP { procUnpin() + return + } + + var buf [][]byte + var l, c, released int + for i := 0; i < defaultPCacheMaxSize; i++ { + l = len(ref.pc.inactive[pid][i]) + if l == 0 { + continue + } + c = 1 << i + buf = make([][]byte, l/2, l) + copy(buf, ref.pc.inactive[pid][i][:l/2]) // the first l/2 items are more inactive + ref.pc.inactive[pid][i] = buf + released += c * len(buf) + } + procUnpin() + if debug && released > 0 { + log.Printf("PCACHE: P[%d] release: %d bytes", pid, released) } - runtime.SetFinalizer(ref, gcRefHandler) } func newPCache() *pcache { - return newLimitedPCache(defaultPCacheLimitPerP) + return newLimitedPCache(defaultPCacheActiveLimitPerP) } -func newLimitedPCache(limitPerP int64) *pcache { +func newLimitedPCache(limitPerP int) *pcache { procs := runtime.GOMAXPROCS(0) pc := &pcache{ - active: make([][defaultPCacheMaxSize][][]byte, procs), - activeSize: make([]int64, procs), - inactive: make([][defaultPCacheMaxSize][][]byte, procs), - activeLimit: limitPerP, + blocks: make([][]byte, procs), + active: make([][defaultPCacheMaxSize][][]byte, procs), + inactive: make([][defaultPCacheMaxSize][][]byte, procs), + } + + // init arena + pc.arena = NewArena(limitPerP * procs) + pc.arenaStart = uintptr(unsafe.Pointer(&pc.arena[0])) + pc.arenaEnd = uintptr(unsafe.Pointer(&pc.arena[len(pc.arena)-1])) + for i := 0; i < procs; i++ { + pc.blocks[i] = pc.arena[i*limitPerP : (i+1)*limitPerP] } + pc.ref = &pcacheRef{pc: pc} runtime.SetFinalizer(pc.ref, gcRefHandler) pc.ref = nil // trigger gc @@ -84,37 +118,77 @@ func (p *pcache) Malloc(size int, _capacity ...int) (data []byte) { capacity = _capacity[0] } cidx := calcCapIndex(capacity) - capacity = 1 << cidx + clen := 1 << cidx pid := procPin() - if len(p.active[pid][cidx]) > 0 { - data = p.active[pid][cidx][0] - p.active[pid][cidx] = p.active[pid][cidx][1:] - p.activeSize[pid] -= int64(capacity) - } else if len(p.inactive[pid][cidx]) > 0 { - data = p.inactive[pid][cidx][0] - p.inactive[pid][cidx] = p.inactive[pid][cidx][1:] - } else { - data = make([]byte, size, capacity) + l := len(p.active[pid][cidx]) + if l > 0 { + data = p.active[pid][cidx][l-1][:size:capacity] + p.active[pid][cidx] = p.active[pid][cidx][:l-1] + procUnpin() + if debug { + log.Printf("PCACHE: P[%d] reuse active %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + } + return data } + + l = len(p.inactive[pid][cidx]) + if l > 0 { + data = p.inactive[pid][cidx][l-1][:size:capacity] + p.inactive[pid][cidx] = p.inactive[pid][cidx][:l-1] + procUnpin() + if debug { + log.Printf("PCACHE: P[%d] reuse inactive %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + } + return data + } + + if clen <= len(p.blocks[pid]) { + data = p.blocks[pid][:size:capacity] + p.blocks[pid] = p.blocks[pid][clen:] // need occupy full clen not only capacity + procUnpin() + if debug { + log.Printf("PCACHE: P[%d] reuse arena %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + } + return data + } + procUnpin() - return data[:size] + // malloc full clen buffer but only use capacity size + data = make([]byte, size, clen)[:size:capacity] + if debug { + log.Printf("PCACHE: P[%d] malloc %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + } + return data } func (p *pcache) Free(data []byte) { capacity := cap(data) + if capacity == 0 { + return + } cidx := calcCapIndex(capacity) + clen := 1 << cidx + dp := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + addr := dp.Data data = data[:0] + dp.Cap = clen pid := procPin() - // if out of active limit, put into inactive - if p.activeLimit > 0 && p.activeSize[pid] >= p.activeLimit { - p.inactive[pid][cidx] = append(p.inactive[pid][cidx], data) - } else { - p.activeSize[pid] += int64(capacity) + if addr >= p.arenaStart && addr <= p.arenaEnd { p.active[pid][cidx] = append(p.active[pid][cidx], data) + procUnpin() + if debug { + log.Printf("PCACHE: P[%d] free active %d bytes, addr %d", pid, clen, addr) + } + return } + + p.inactive[pid][cidx] = append(p.inactive[pid][cidx], data) procUnpin() + if debug { + log.Printf("PCACHE: P[%d] free inactive %d bytes, addr %d", pid, clen, addr) + } } func calcCapIndex(size int) int { diff --git a/pcache_arena.go b/pcache_arena.go new file mode 100644 index 00000000..620cf2bd --- /dev/null +++ b/pcache_arena.go @@ -0,0 +1,14 @@ +package netpoll + +// #include +import "C" +import ( + "unsafe" +) + +const arenaSize = 4 << 30 // 4gb + +func NewArena(size int) []byte { + data := ((*[arenaSize]byte)(unsafe.Pointer(C.calloc(C.size_t(size), C.size_t(1)))))[:size:size] + return data +} diff --git a/pcache_test.go b/pcache_test.go index 0f0b0638..6a78184b 100644 --- a/pcache_test.go +++ b/pcache_test.go @@ -1,11 +1,17 @@ +//+build !race + package netpoll import ( + "fmt" "runtime" + "sync" "testing" + + "github.com/bytedance/gopkg/lang/mcache" ) -func TestPCache(t *testing.T) { +func TestPCacheSingleP(t *testing.T) { procs := runtime.GOMAXPROCS(0) runtime.GOMAXPROCS(1) defer runtime.GOMAXPROCS(procs) @@ -14,7 +20,7 @@ func TestPCache(t *testing.T) { pc := newPCache() buf := pc.Malloc(1024, 1025) Equal(t, len(buf), 1024) - Equal(t, cap(buf), 2048) + Equal(t, cap(buf), 1025) pc.Free(buf) Equal(t, len(pc.active[0][calcCapIndex(1025)]), 1) @@ -22,8 +28,35 @@ func TestPCache(t *testing.T) { Equal(t, len(buf), 1024) Equal(t, cap(buf), 1024) pc.Free(buf) - Equal(t, len(pc.active[0][calcCapIndex(1025)]), 1) Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) + Equal(t, len(pc.active[0][calcCapIndex(1025)]), 1) +} + +func TestPCacheMultiP(t *testing.T) { + pc := newLimitedPCache(1024) // 1KB + size := 50 + testdata := make([]byte, size) + for i := 0; i < len(testdata); i++ { + testdata[i] = 'a' + byte(i%26) + } + procs := runtime.GOMAXPROCS(0) + var wg sync.WaitGroup + for i := 0; i < procs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 1000; j++ { + buf := pc.Malloc(size, size) + copy(buf, testdata) + Equal(t, len(buf), size) + Equal(t, cap(buf), size) + Equal(t, string(buf), string(testdata)) + runtime.Gosched() + pc.Free(buf) + } + }() + } + wg.Wait() } func TestPCacheGC(t *testing.T) { @@ -61,3 +94,45 @@ func TestPCacheGC(t *testing.T) { Equal(t, len(pc.active[0][calcCapIndex(1024)]), 1) Equal(t, len(pc.inactive[0][calcCapIndex(1024)]), 0) } + +var benchSizes = []int{ + 1024, + 1024 * 8, 1024 * 64, // small size + 1024 * 1024, 1024 * 1024 * 8, // large size +} // unit: Bytes + +func BenchmarkPCache(b *testing.B) { + for _, size := range benchSizes { + pc := newLimitedPCache(1024 * 1024 * 1) // 1MB + b.Run(fmt.Sprintf("Malloc-%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + buf := pc.Malloc(size) + pc.Free(buf) + } + }) + }) + d := make([]int, len(pc.active)) + for pid := 0; pid < len(pc.active); pid++ { + for cidx := 0; cidx < len(pc.active[pid]); cidx++ { + for i := 0; i < len(pc.active[pid][cidx]); i++ { + d[pid] += cap(pc.active[pid][cidx][i]) + } + } + } + b.Logf("pcache-%d active distribution: %v", size, d) + } +} + +func BenchmarkMCache(b *testing.B) { + for _, size := range benchSizes { + b.Run(fmt.Sprintf("Malloc-%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + buf := mcache.Malloc(size) + mcache.Free(buf) + } + }) + }) + } +} From fd650839762cbfef828a487653c8cfaabada6e0f Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 20 Oct 2022 18:23:53 +0800 Subject: [PATCH 3/4] perf: merge inactive memory into huge page --- pcache.go | 70 ++++++++++++++++++++++++------------------------- pcache_arena.go | 14 ---------- 2 files changed, 35 insertions(+), 49 deletions(-) delete mode 100644 pcache_arena.go diff --git a/pcache.go b/pcache.go index 90819583..056030dc 100644 --- a/pcache.go +++ b/pcache.go @@ -26,21 +26,21 @@ func Free(buf []byte) { } const ( - debug = false - defaultPCacheMaxSize = 42 // 2^42=4 TB - defaultPCacheActiveLimitPerP = 1024 * 1024 * 32 - defaultPCacheInactiveLimitPerP = 1024 * 1024 * 16 - defaultPCacheCleanCycles = 3 + debug = false + defaultPCacheMaxSize = 42 // 2^42=4 TB + defaultPCacheBlockSize = 1024 * 1024 * 16 + defaultPCacheCleanCycles = 3 ) type pcache struct { - arena []byte - arenaStart uintptr // arena address start - arenaEnd uintptr // arena address end - blocks [][]byte // [pid][]byte, pid=(addr-addr_start)/block_size - active [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack - inactive [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack - ref *pcacheRef + arena []byte // fixed size continuous memory + arenaStart uintptr // arena address start + arenaEnd uintptr // arena address end + activeBlocks [][]byte // [pid][]byte + inactiveBlocks [][]byte // [pid][]byte + active [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack + inactive [][defaultPCacheMaxSize][][]byte // [pid][cap_idx][idx]stack + ref *pcacheRef // for gc trigger } type pcacheRef struct { @@ -58,15 +58,6 @@ func gcRefHandler(ref *pcacheRef) { // trigger handler pid := procPin() - var cached int - for i := 0; i < defaultPCacheMaxSize; i++ { - cached += (1 << i) * len(ref.pc.inactive[pid][i]) - } - if cached == 0 || cached < defaultPCacheInactiveLimitPerP { - procUnpin() - return - } - var buf [][]byte var l, c, released int for i := 0; i < defaultPCacheMaxSize; i++ { @@ -87,23 +78,25 @@ func gcRefHandler(ref *pcacheRef) { } func newPCache() *pcache { - return newLimitedPCache(defaultPCacheActiveLimitPerP) + return newLimitedPCache(defaultPCacheBlockSize * runtime.GOMAXPROCS(0)) } -func newLimitedPCache(limitPerP int) *pcache { +func newLimitedPCache(size int) *pcache { procs := runtime.GOMAXPROCS(0) + sizePerP := size / procs pc := &pcache{ - blocks: make([][]byte, procs), - active: make([][defaultPCacheMaxSize][][]byte, procs), - inactive: make([][defaultPCacheMaxSize][][]byte, procs), + activeBlocks: make([][]byte, procs), + inactiveBlocks: make([][]byte, procs), + active: make([][defaultPCacheMaxSize][][]byte, procs), + inactive: make([][defaultPCacheMaxSize][][]byte, procs), } // init arena - pc.arena = NewArena(limitPerP * procs) + pc.arena = make([]byte, size) pc.arenaStart = uintptr(unsafe.Pointer(&pc.arena[0])) pc.arenaEnd = uintptr(unsafe.Pointer(&pc.arena[len(pc.arena)-1])) for i := 0; i < procs; i++ { - pc.blocks[i] = pc.arena[i*limitPerP : (i+1)*limitPerP] + pc.activeBlocks[i] = pc.arena[i*sizePerP : (i+1)*sizePerP] } pc.ref = &pcacheRef{pc: pc} @@ -143,21 +136,28 @@ func (p *pcache) Malloc(size int, _capacity ...int) (data []byte) { return data } - if clen <= len(p.blocks[pid]) { - data = p.blocks[pid][:size:capacity] - p.blocks[pid] = p.blocks[pid][clen:] // need occupy full clen not only capacity + if clen <= len(p.activeBlocks[pid]) { + data = p.activeBlocks[pid][:size:capacity] + p.activeBlocks[pid] = p.activeBlocks[pid][clen:] procUnpin() if debug { - log.Printf("PCACHE: P[%d] reuse arena %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + log.Printf("PCACHE: P[%d] malloc from activeBlock %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) } return data } + if clen > len(p.inactiveBlocks[pid]) { + if clen < defaultPCacheBlockSize { + p.inactiveBlocks[pid] = make([]byte, defaultPCacheBlockSize) + } else { + p.inactiveBlocks[pid] = make([]byte, clen) + } + } + data = p.inactiveBlocks[pid][:size:capacity] + p.inactiveBlocks[pid] = p.inactiveBlocks[pid][clen:] procUnpin() - // malloc full clen buffer but only use capacity size - data = make([]byte, size, clen)[:size:capacity] if debug { - log.Printf("PCACHE: P[%d] malloc %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) + log.Printf("PCACHE: P[%d] malloc from inactiveBlock %d bytes, addr %d", pid, clen, uintptr(unsafe.Pointer(&data[:1][0]))) } return data } diff --git a/pcache_arena.go b/pcache_arena.go deleted file mode 100644 index 620cf2bd..00000000 --- a/pcache_arena.go +++ /dev/null @@ -1,14 +0,0 @@ -package netpoll - -// #include -import "C" -import ( - "unsafe" -) - -const arenaSize = 4 << 30 // 4gb - -func NewArena(size int) []byte { - data := ((*[arenaSize]byte)(unsafe.Pointer(C.calloc(C.size_t(size), C.size_t(1)))))[:size:size] - return data -} From bec9b7a018b04c105a75e44a4cfef393487dbbe3 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 25 Oct 2022 14:50:38 +0800 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- pcache.go | 14 ++++++++++++++ pcache_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/pcache.go b/pcache.go index 056030dc..3d656212 100644 --- a/pcache.go +++ b/pcache.go @@ -1,3 +1,17 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package netpoll import ( diff --git a/pcache_test.go b/pcache_test.go index 6a78184b..30214950 100644 --- a/pcache_test.go +++ b/pcache_test.go @@ -1,3 +1,17 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //+build !race package netpoll