From ee8ff3e48d73ae6791c22f27ffaa4630b3302d4d Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Thu, 6 Jun 2024 17:50:47 +0800 Subject: [PATCH 1/3] fix: Peek oom and performance issue --- nocopy_linkbuffer.go | 75 ++++++++++++++++++++++++++++----------- nocopy_linkbuffer_test.go | 52 +++++++++++++++++++++------ 2 files changed, 96 insertions(+), 31 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 2f4f3364..62c7c152 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -56,7 +56,12 @@ type UnsafeLinkBuffer struct { flush *linkBufferNode // malloc head write *linkBufferNode // malloc tail - caches [][]byte // buf allocated by Next when cross-package, which should be freed when release + // buf allocated by Next when cross-package, which should be freed when release + caches [][]byte + + // for `Peek` only, avoid creating too many []byte in `caches` + // fix the issue when we have a large buffer and we call `Peek` multiple times + cachePeek []byte } // Len implements Reader. @@ -124,28 +129,49 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { if b.isSingleNode(n) { return b.read.Peek(n), nil } + // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = dirtmake.Bytes(n, n) - } - var node = b.read - var l int - for ack := n; ack > 0; ack = ack - l { - l = node.Len() - if l >= ack { - pIdx += copy(p[pIdx:], node.Peek(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], node.Peek(l)) + // always use malloc, since we will reuse b.cachePeek + if b.cachePeek != nil && cap(b.cachePeek) < n { + free(b.cachePeek) + b.cachePeek = nil + } + if b.cachePeek == nil { + b.cachePeek = malloc(n, n) + b.cachePeek = b.cachePeek[:0] // init with zero len, will append later + } + p = b.cachePeek + if len(p) >= n { + // in case we peek smaller than last time + // we will reset cachePeek when Next or Skip + return p[:n], nil + } + + // How it works >>>>>> + // [ -------- node0 -------- ][ --------- node1 --------- ] <- b.read + // [ --------------- p --------------- ] + // ^ len(p) ^ n here + // ^ scanned + // `scanned` var is the len of last nodes which we scanned and already copied to p + // `len(p) - scanned` is the start pos of current node for p to copy from + // `n - len(p)` is the len of bytes we're going to append to p + // we copy `len(node1)` - `len(p) - scanned` bytes in case node1 doesn't have enough data + for scanned, node := 0, b.read; len(p) < n; node = node.next { + l := node.Len() + if scanned+l <= len(p) { // already copied in p, skip + scanned += l + continue } - node = node.next + start := len(p) - scanned // `start` must be smaller than l coz `scanned+l <= len(p)` is false + copyn := n - len(p) + if nodeLeftN := l - start; copyn > nodeLeftN { + copyn = nodeLeftN + } + p = append(p, node.Peek(l)[start:start+copyn]...) + scanned += l } - _ = pIdx - return p, nil + b.cachePeek = p + return p[:n], nil } // Skip implements Reader. @@ -187,6 +213,10 @@ func (b *UnsafeLinkBuffer) Release() (err error) { b.caches[i] = nil } b.caches = b.caches[:0] + if b.cachePeek != nil { + free(b.cachePeek) + b.cachePeek = nil + } return nil } @@ -692,6 +722,11 @@ func (b *UnsafeLinkBuffer) indexByte(c byte, skip int) int { // recalLen re-calculate the length func (b *UnsafeLinkBuffer) recalLen(delta int) (length int) { + if delta < 0 && len(b.cachePeek) > 0 { + // b.cachePeek will contain stale data if we read out even a single byte from buffer, + // so we need to reset it or the next Peek call will return invalid bytes. + b.cachePeek = b.cachePeek[:0] + } return int(atomic.AddInt64(&b.length, int64(delta))) } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 8a9f8cfe..8abbebe0 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -85,7 +85,7 @@ func TestLinkBuffer(t *testing.T) { Equal(t, buf.Len(), 100) } -func TestGetBytes(t *testing.T) { +func TestLinkBufferGetBytes(t *testing.T) { buf := NewLinkBuffer() var ( num = 10 @@ -195,8 +195,7 @@ func TestLinkBufferWithInvalid(t *testing.T) { } } -// cross-block operation test -func TestLinkBufferIndex(t *testing.T) { +func TestLinkBufferMultiNode(t *testing.T) { // clean & new LinkBufferCap = 8 @@ -206,6 +205,9 @@ func TestLinkBufferIndex(t *testing.T) { var p []byte p, _ = buf.Malloc(15) + for i := 0; i < len(p); i++ { // updates p[0] - p[14] to 0 - 14 + p[i] = byte(i) + } Equal(t, len(p), 15) MustTrue(t, buf.read == buf.flush) Equal(t, buf.read.off, 0) @@ -215,6 +217,9 @@ func TestLinkBufferIndex(t *testing.T) { Equal(t, cap(buf.write.buf), 16) // mcache up-aligned to the power of 2 p, _ = buf.Malloc(7) + for i := 0; i < len(p); i++ { // updates p[0] - p[6] to 15 - 21 + p[i] = byte(i + 15) + } Equal(t, len(p), 7) MustTrue(t, buf.read == buf.flush) Equal(t, buf.read.off, 0) @@ -236,19 +241,44 @@ func TestLinkBufferIndex(t *testing.T) { p, _ = buf.Next(13) Equal(t, len(p), 13) + Equal(t, p[0], byte(0)) + Equal(t, p[12], byte(12)) MustTrue(t, buf.read != buf.flush) Equal(t, buf.read.off, 13) Equal(t, buf.read.Len(), 2) + Equal(t, buf.read.next.Len(), 7) Equal(t, buf.flush.off, 0) Equal(t, buf.flush.malloc, 7) + // Peek p, _ = buf.Peek(4) Equal(t, len(p), 4) + Equal(t, p[0], byte(13)) + Equal(t, p[1], byte(14)) + Equal(t, p[2], byte(15)) + Equal(t, p[3], byte(16)) + Equal(t, len(buf.cachePeek), 4) + p, _ = buf.Peek(3) // case: smaller than the last call + Equal(t, len(p), 3) + Equal(t, p[0], byte(13)) + Equal(t, p[2], byte(15)) + Equal(t, len(buf.cachePeek), 4) + p, _ = buf.Peek(5) // case: Peek than the max call, and cap(buf.cachePeek) < n + Equal(t, len(p), 5) + Equal(t, p[0], byte(13)) + Equal(t, p[4], byte(17)) + Equal(t, len(buf.cachePeek), 5) + p, _ = buf.Peek(6) // case: Peek than the last call, and cap(buf.cachePeek) > n + Equal(t, len(p), 6) + Equal(t, p[0], byte(13)) + Equal(t, p[5], byte(18)) + Equal(t, len(buf.cachePeek), 6) MustTrue(t, buf.read != buf.flush) Equal(t, buf.read.off, 13) Equal(t, buf.read.Len(), 2) Equal(t, buf.flush.off, 0) Equal(t, buf.flush.malloc, 7) + // Peek ends buf.book(block8k, block8k) MustTrue(t, buf.flush == buf.write) @@ -377,7 +407,7 @@ func TestLinkBufferResetTail(t *testing.T) { Equal(t, got, except) } -func TestWriteBuffer(t *testing.T) { +func TestLinkBufferWriteBuffer(t *testing.T) { buf1 := NewLinkBuffer() buf2 := NewLinkBuffer() b2, _ := buf2.Malloc(1) @@ -414,7 +444,7 @@ func TestLinkBufferCheckSingleNode(t *testing.T) { buf.isSingleNode(1) } -func TestWriteMultiFlush(t *testing.T) { +func TestLinkBufferWriteMultiFlush(t *testing.T) { buf := NewLinkBuffer() b1, _ := buf.Malloc(4) b1[0] = 1 @@ -444,7 +474,7 @@ func TestWriteMultiFlush(t *testing.T) { MustTrue(t, len(buf.Bytes()) == 4) } -func TestWriteBinary(t *testing.T) { +func TestLinkBufferWriteBinary(t *testing.T) { // clean & new LinkBufferCap = 8 @@ -465,7 +495,7 @@ func TestWriteBinary(t *testing.T) { Equal(t, b[9], byte(0)) } -func TestWriteDirect(t *testing.T) { +func TestLinkBufferWriteDirect(t *testing.T) { // clean & new LinkBufferCap = 32 @@ -492,7 +522,7 @@ func TestWriteDirect(t *testing.T) { } } -func TestNoCopyWriteAndRead(t *testing.T) { +func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] const ( mallocLen = 4096 * 2 @@ -578,7 +608,7 @@ func TestNoCopyWriteAndRead(t *testing.T) { runtime.KeepAlive(normalBuf) } -func TestBufferMode(t *testing.T) { +func TestLinkBufferBufferMode(t *testing.T) { bufnode := newLinkBufferNode(0) MustTrue(t, !bufnode.getMode(nocopyReadMask)) MustTrue(t, bufnode.getMode(readonlyMask)) @@ -726,7 +756,7 @@ func BenchmarkStringToCopy(b *testing.B) { _ = bs } -func BenchmarkPoolGet(b *testing.B) { +func BenchmarkLinkBufferPoolGet(b *testing.B) { var v *linkBufferNode if false { b.Logf("bs = %v", v) @@ -759,7 +789,7 @@ func BenchmarkCopyString(b *testing.B) { }) } -func BenchmarkNoCopyRead(b *testing.B) { +func BenchmarkLinkBufferNoCopyRead(b *testing.B) { totalSize := 0 minSize := 32 maxSize := minSize << 9 From 0ebdfd619869ffe3bcee6079043bacce82dbd829 Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Thu, 6 Jun 2024 18:22:32 +0800 Subject: [PATCH 2/3] chore: add more common for UnsafeLinkBuffer Peek --- nocopy_linkbuffer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 62c7c152..d47d7d74 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -142,8 +142,9 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { } p = b.cachePeek if len(p) >= n { - // in case we peek smaller than last time - // we will reset cachePeek when Next or Skip + // in case we peek smaller than last time, + // we can return cache data directly. + // we will reset cachePeek when Next or Skip, no worries about stale data return p[:n], nil } From f7793e8450e4a2c7f18475a6664515b05a49fae9 Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Fri, 7 Jun 2024 13:30:59 +0800 Subject: [PATCH 3/3] chore: minor code optimize --- nocopy_linkbuffer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index d47d7d74..bfa80d38 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -131,14 +131,14 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { } // multiple nodes - // always use malloc, since we will reuse b.cachePeek + + // try to make use of the cap of b.cachePeek, if can't, free it. if b.cachePeek != nil && cap(b.cachePeek) < n { free(b.cachePeek) b.cachePeek = nil } if b.cachePeek == nil { - b.cachePeek = malloc(n, n) - b.cachePeek = b.cachePeek[:0] // init with zero len, will append later + b.cachePeek = malloc(0, n) // init with zero len, will append later } p = b.cachePeek if len(p) >= n {