diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 93871a65..0d793ada 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -78,6 +78,9 @@ func (b *LinkBuffer) IsEmpty() (ok bool) { // Next implements Reader. func (b *LinkBuffer) Next(n int) (p []byte, err error) { + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer next[%d] not enough", n) @@ -85,8 +88,7 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { b.recalLen(-n) // re-cal length // single node - l := b.firstReadLen() - if l >= n { + if b.isSingleNode(n) { return b.read.Next(n), nil } // multiple nodes @@ -97,6 +99,7 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { } else { p = make([]byte, n) } + var l int for ack := n; ack > 0; ack = ack - l { l = b.read.Len() if l >= ack { @@ -114,13 +117,15 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { // Peek does not have an independent lifecycle, and there is no signal to // indicate that Peek content can be released, so Peek will not introduce mcache for now. func (b *LinkBuffer) Peek(n int) (p []byte, err error) { + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer peek[%d] not enough", n) } // single node - l := b.firstReadLen() - if l >= n { + if b.isSingleNode(n) { return b.read.Peek(n), nil } // multiple nodes @@ -132,6 +137,7 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) { p = make([]byte, n) } var node = b.read + var l int for ack := n; ack > 0; ack = ack - l { l = node.Len() if l >= ack { @@ -148,6 +154,9 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) { // Skip implements Reader. func (b *LinkBuffer) Skip(n int) (err error) { + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return fmt.Errorf("link buffer skip[%d] not enough", n) @@ -187,6 +196,9 @@ func (b *LinkBuffer) Release() (err error) { // ReadString implements Reader. func (b *LinkBuffer) ReadString(n int) (s string, err error) { + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return s, fmt.Errorf("link buffer read string[%d] not enough", n) @@ -196,6 +208,9 @@ func (b *LinkBuffer) ReadString(n int) (s string, err error) { // ReadBinary implements Reader. func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer read binary[%d] not enough", n) @@ -209,13 +224,13 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) { // single node p = make([]byte, n) - l := b.firstReadLen() - if l >= n { + if b.isSingleNode(n) { copy(p, b.read.Next(n)) return p } // multiple nodes var pIdx int + var l int for ack := n; ack > 0; ack = ack - l { l = b.read.Len() if l >= ack { @@ -250,6 +265,9 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) { // // Slice will automatically execute a Release. func (b *LinkBuffer) Slice(n int) (r Reader, err error) { + if n <= 0 { + return NewLinkBuffer(0), nil + } // check whether enough or not. if b.Len() < n { return r, fmt.Errorf("link buffer readv[%d] not enough", n) @@ -267,13 +285,13 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { }() // single node - l := b.firstReadLen() - if l >= n { + if b.isSingleNode(n) { node := b.read.Refer(n) p.head, p.read, p.flush = node, node, node return p, nil } // multiple nodes + var l = b.read.Len() node := b.read.Refer(l) b.read = b.read.next @@ -297,6 +315,9 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { // Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush). func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { + if n <= 0 { + return + } b.mallocSize += n b.growth(n) return b.write.Malloc(n), nil @@ -309,6 +330,9 @@ func (b *LinkBuffer) MallocLen() (length int) { // MallocAck will keep the first n malloc bytes and discard the rest. func (b *LinkBuffer) MallocAck(n int) (err error) { + if n < 0 { + return fmt.Errorf("link buffer malloc ack[%d] invalid", n) + } b.mallocSize = n b.write = b.flush @@ -363,6 +387,9 @@ func (b *LinkBuffer) Append(w Writer) (err error) { // you must actively submit before read the data. // The argument buf can't be used after calling WriteBuffer. (set it to nil) func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { + if buf == nil { + return + } bufLen, bufMallocLen := buf.Len(), buf.MallocLen() if bufLen+bufMallocLen <= 0 { return nil @@ -399,6 +426,9 @@ func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { // WriteString implements Writer. func (b *LinkBuffer) WriteString(s string) (n int, err error) { + if len(s) == 0 { + return + } buf := unsafeStringToSlice(s) return b.WriteBinary(buf) } @@ -406,6 +436,9 @@ func (b *LinkBuffer) WriteString(s string) (n int, err error) { // WriteBinary implements Writer. func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { n = len(p) + if n == 0 { + return + } b.mallocSize += n // TODO: Verify that all nocopy is possible under mcache. @@ -426,6 +459,9 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { // WriteDirect cannot be mixed with WriteString or WriteBinary functions. func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { n := len(p) + if n == 0 || remainLen < 0 { + return nil + } // find origin origin := b.flush malloc := b.mallocSize - remainLen // calculate the remaining malloc length @@ -731,14 +767,18 @@ func (b *LinkBuffer) growth(n int) { } } -// firstReadLen returns the length of the first node greater than zero. -func (b *LinkBuffer) firstReadLen() int { +// isSingleNode determines whether reading needs to cross nodes. +// Must require b.Len() > 0 +func (b *LinkBuffer) isSingleNode(readN int) (single bool) { + if readN <= 0 { + return true + } l := b.read.Len() for l == 0 { b.read = b.read.next l = b.read.Len() } - return l + return l >= readN } // zero-copy slice convert to string diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 9eb6f7a4..e6fac4b9 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -32,7 +32,7 @@ import ( // which is the threshold to use copy to minimize overhead. const BinaryInplaceThreshold = block4k -// LinkBufferCap, which can be modified, marks the minimum value of each node of LinkBuffer. +// LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. var LinkBufferCap = block4k // NewLinkBuffer size defines the initial capacity, but there is no readable data. @@ -51,7 +51,7 @@ func NewLinkBuffer(size ...int) *LinkBuffer { type LinkBuffer struct { sync.Mutex length int32 - mallocSize int32 + mallocSize int head *linkBufferNode // release head read *linkBufferNode // read head @@ -81,6 +81,9 @@ func (b *LinkBuffer) IsEmpty() (ok bool) { func (b *LinkBuffer) Next(n int) (p []byte, err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer next[%d] not enough", n) @@ -88,18 +91,18 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { b.recalLen(-n) // re-cal length // single node - l := b.read.Len() - if l >= n { + if b.isSingleNode(n) { return b.read.Next(n), nil } // multiple nodes var pIdx int - if n > block1k { - p = mcache.Malloc(n) + if block1k < n && n <= mallocMax { + p = malloc(n, n) b.caches = append(b.caches, p) } else { p = make([]byte, n) } + var l int for ack := n; ack > 0; ack = ack - l { l = b.read.Len() if l >= ack { @@ -119,19 +122,27 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { func (b *LinkBuffer) Peek(n int) (p []byte, err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer peek[%d] not enough", n) } - var node = b.read // single node - l := node.Len() - if l >= n { - return node.Peek(n), nil + if b.isSingleNode(n) { + return b.read.Peek(n), nil } // multiple nodes var pIdx int - p = make([]byte, n) + if block1k < n && n <= mallocMax { + p = malloc(n, n) + b.caches = append(b.caches, p) + } else { + p = make([]byte, n) + } + var node = b.read + var l int for ack := n; ack > 0; ack = ack - l { l = node.Len() if l >= ack { @@ -150,6 +161,9 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) { func (b *LinkBuffer) Skip(n int) (err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return fmt.Errorf("link buffer skip[%d] not enough", n) @@ -185,8 +199,9 @@ func (b *LinkBuffer) release() (err error) { b.head = b.head.next node.Release() } - for _, buf := range b.caches { - mcache.Free(buf) + for i := range b.caches { + free(b.caches[i]) + b.caches[i] = nil } b.caches = b.caches[:0] return nil @@ -196,6 +211,9 @@ func (b *LinkBuffer) release() (err error) { func (b *LinkBuffer) ReadString(n int) (s string, err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return s, fmt.Errorf("link buffer read string[%d] not enough", n) @@ -207,6 +225,9 @@ func (b *LinkBuffer) ReadString(n int) (s string, err error) { func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return + } // check whether enough or not. if b.Len() < n { return p, fmt.Errorf("link buffer read binary[%d] not enough", n) @@ -220,13 +241,13 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) { // single node p = make([]byte, n) - l := b.read.Len() - if l >= n { + if b.isSingleNode(n) { copy(p, b.read.Next(n)) return p } // multiple nodes var pIdx int + var l int for ack := n; ack > 0; ack = ack - l { l = b.read.Len() if l >= ack { @@ -247,7 +268,7 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) { defer b.Unlock() // check whether enough or not. if b.Len() < 1 { - return p, fmt.Errorf("link buffer read byte is empty") + return p, errors.New("link buffer read byte is empty") } b.recalLen(-1) // re-cal length for { @@ -265,6 +286,9 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) { func (b *LinkBuffer) Slice(n int) (r Reader, err error) { b.Lock() defer b.Unlock() + if n <= 0 { + return NewLinkBuffer(0), nil + } // check whether enough or not. if b.Len() < n { return r, fmt.Errorf("link buffer readv[%d] not enough", n) @@ -282,13 +306,13 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { }() // single node - l := b.read.Len() - if l >= n { + if b.isSingleNode(n) { node := b.read.Refer(n) p.head, p.read, p.flush = node, node, node return p, nil } // multiple nodes + var l = b.read.Len() node := b.read.Refer(l) b.read = b.read.next @@ -314,21 +338,30 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { b.Lock() defer b.Unlock() - b.recalMallocLen(n) + if n <= 0 { + return + } + b.mallocSize += n b.growth(n) return b.write.Malloc(n), nil } // MallocLen implements Writer. func (b *LinkBuffer) MallocLen() (length int) { - return int(atomic.LoadInt32(&b.mallocSize)) + b.Lock() + defer b.Unlock() + length = b.mallocSize + return length } // MallocAck will keep the first n malloc bytes and discard the rest. func (b *LinkBuffer) MallocAck(n int) (err error) { b.Lock() defer b.Unlock() - atomic.StoreInt32(&b.mallocSize, int32(n)) + if n < 0 { + return fmt.Errorf("link buffer malloc ack[%d] invalid", n) + } + b.mallocSize = n b.write = b.flush var l int @@ -351,16 +384,21 @@ func (b *LinkBuffer) MallocAck(n int) (err error) { func (b *LinkBuffer) Flush() (err error) { b.Lock() defer b.Unlock() - // 计算 malloc size - atomic.StoreInt32(&b.mallocSize, 0) + b.mallocSize = 0 + // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. + if cap(b.write.buf) > pagesize { + b.write.next = newLinkBufferNode(0) + b.write = b.write.next + } var n int - for b.flush != b.write { - n += b.flush.malloc - len(b.flush.buf) - b.flush.buf = b.flush.buf[:b.flush.malloc] - b.flush = b.flush.next + for node := b.flush; node != b.write.next; node = node.next { + delta := node.malloc - len(node.buf) + if delta > 0 { + n += delta + node.buf = node.buf[:node.malloc] + } } - n += b.flush.malloc - len(b.flush.buf) - b.flush.buf = b.flush.buf[:b.flush.malloc] + b.flush = b.write // re-cal length b.recalLen(n) return nil @@ -381,6 +419,9 @@ func (b *LinkBuffer) Append(w Writer) (err error) { func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { b.Lock() defer b.Unlock() + if buf == nil { + return + } bufLen, bufMallocLen := buf.Len(), buf.MallocLen() if bufLen+bufMallocLen <= 0 { return nil @@ -411,12 +452,15 @@ func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { if bufLen > 0 { b.recalLen(bufLen) } - b.recalMallocLen(bufMallocLen) + b.mallocSize += bufMallocLen return nil } // WriteString implements Writer. func (b *LinkBuffer) WriteString(s string) (n int, err error) { + if len(s) == 0 { + return + } buf := unsafeStringToSlice(s) return b.WriteBinary(buf) } @@ -426,8 +470,12 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { b.Lock() defer b.Unlock() n = len(p) - b.recalMallocLen(n) + if n == 0 { + return + } + b.mallocSize += n + // TODO: Verify that all nocopy is possible under mcache. if n > BinaryInplaceThreshold { // expand buffer directly with nocopy b.write.next = newLinkBufferNode(0) @@ -447,9 +495,12 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { b.Lock() defer b.Unlock() n := len(p) + if n == 0 || remainLen < 0 { + return nil + } // find origin origin := b.flush - malloc := int(atomic.LoadInt32(&b.mallocSize)) - remainLen // calculate the remaining malloc length + malloc := b.mallocSize - remainLen // calculate the remaining malloc length for t := origin.malloc - len(origin.buf); t <= malloc; t = origin.malloc - len(origin.buf) { malloc -= t origin = origin.next @@ -479,7 +530,7 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { b.write = b.write.next } - b.recalMallocLen(n) + b.mallocSize += n return nil } @@ -497,8 +548,7 @@ func (b *LinkBuffer) Close() (err error) { b.Lock() defer b.Unlock() atomic.StoreInt32(&b.length, 0) - atomic.StoreInt32(&b.mallocSize, 0) - + b.mallocSize = 0 // just release all for node := b.head; node != nil; { nd := node @@ -564,12 +614,6 @@ func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { l = maxSize b.write.next = newLinkBufferNode(maxSize) b.write = b.write.next - - // If there is no data in read node, then point it to next one. - if b.Len() == 0 { - b.read, b.flush = b.write, b.write - } - } if l > bookSize { l = bookSize @@ -594,8 +638,6 @@ func (b *LinkBuffer) bookAck(n int) (length int, err error) { // calcMaxSize will calculate the data size between two Release() func (b *LinkBuffer) calcMaxSize() (sum int) { - b.Lock() - defer b.Unlock() for node := b.head; node != b.read; node = node.next { sum += len(node.buf) } @@ -606,8 +648,6 @@ func (b *LinkBuffer) calcMaxSize() (sum int) { // resetTail will reset tail node or add an empty tail node to // guarantee the tail node is not larger than 8KB func (b *LinkBuffer) resetTail(maxSize int) { - b.Lock() - defer b.Unlock() // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. if maxSize <= pagesize { b.write.Reset() @@ -621,15 +661,6 @@ func (b *LinkBuffer) resetTail(maxSize int) { return } -func (node *linkBufferNode) Reset() { - if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 { - return - } - node.off, node.malloc = 0, 0 - node.buf = node.buf[:0] - return -} - // Reset resets the buffer to be empty, // but it retains the underlying storage for use by future writes. // Reset is the same as Truncate(0). @@ -646,10 +677,13 @@ func (b *LinkBuffer) recalLen(delta int) (length int) { return int(atomic.AddInt32(&b.length, int32(delta))) } -// recalMallocLen re-calculate the malloc length -func (b *LinkBuffer) recalMallocLen(delta int) (err error) { - atomic.AddInt32(&b.mallocSize, int32(delta)) - return nil +func (node *linkBufferNode) Reset() { + if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 { + return + } + node.off, node.malloc = 0, 0 + node.buf = node.buf[:0] + return } // ------------------------------------------ implement link node ------------------------------------------ @@ -665,7 +699,7 @@ func newLinkBufferNode(size int) *linkBufferNode { if size < LinkBufferCap { size = LinkBufferCap } - node.buf = mcache.Malloc(0, size) + node.buf = malloc(0, size) return node } @@ -753,7 +787,7 @@ func (node *linkBufferNode) Release() (err error) { if node.readonly { node.readonly = false } else { - mcache.Free(node.buf) + free(node.buf) } node.buf = nil linkedPool.Put(node) @@ -779,17 +813,50 @@ func (b *LinkBuffer) growth(n int) { } } +// isSingleNode determines whether reading needs to cross nodes. +// Must require b.Len() > 0 +func (b *LinkBuffer) isSingleNode(readN int) (single bool) { + if readN <= 0 { + return true + } + l := b.read.Len() + for l == 0 { + b.read = b.read.next + l = b.read.Len() + } + return l >= readN +} + // zero-copy slice convert to string func unsafeSliceToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } // zero-copy slice convert to string -func unsafeStringToSlice(s string) []byte { - l := len(s) - return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{ - Data: (*(*reflect.StringHeader)(unsafe.Pointer(&s))).Data, - Len: l, - Cap: l, - })) +func unsafeStringToSlice(s string) (b []byte) { + p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + hdr.Data = uintptr(p) + hdr.Cap = len(s) + hdr.Len = len(s) + return b +} + +// mallocMax is 8MB +const mallocMax = block8k * block1k + +// malloc limits the cap of the buffer from mcache. +func malloc(size, capacity int) []byte { + if capacity > mallocMax { + return make([]byte, size, capacity) + } + return mcache.Malloc(size, capacity) +} + +// free limits the cap of the buffer from mcache. +func free(buf []byte) { + if cap(buf) > mallocMax { + return + } + mcache.Free(buf) } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 4a7ff715..33c4cb09 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -81,6 +81,91 @@ func TestLinkBuffer(t *testing.T) { Equal(t, buf.Len(), 100) } +// TestLinkBufferWithZero test more case with n is invalid. +func TestLinkBufferWithInvalid(t *testing.T) { + // clean & new + LinkBufferCap = 128 + + buf := NewLinkBuffer() + Equal(t, buf.Len(), 0) + MustTrue(t, buf.IsEmpty()) + + for n := 0; n > -5; n-- { + // test writer + p, err := buf.Malloc(n) + Equal(t, len(p), 0) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + MustNil(t, err) + + var wn int + wn, err = buf.WriteString("") + Equal(t, wn, 0) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + MustNil(t, err) + + wn, err = buf.WriteBinary(nil) + Equal(t, wn, 0) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + MustNil(t, err) + + err = buf.WriteDirect(nil, n) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + MustNil(t, err) + + var w *LinkBuffer + err = buf.Append(w) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + MustNil(t, err) + + err = buf.MallocAck(n) + Equal(t, buf.MallocLen(), 0) + Equal(t, buf.Len(), 0) + if n == 0 { + MustNil(t, err) + } else { + MustTrue(t, err != nil) + } + + err = buf.Flush() + MustNil(t, err) + + // test reader + p, err = buf.Next(n) + Equal(t, len(p), 0) + MustNil(t, err) + + p, err = buf.Peek(n) + Equal(t, len(p), 0) + MustNil(t, err) + + err = buf.Skip(n) + Equal(t, len(p), 0) + MustNil(t, err) + + var s string + s, err = buf.ReadString(n) + Equal(t, len(s), 0) + MustNil(t, err) + + p, err = buf.ReadBinary(n) + Equal(t, len(p), 0) + MustNil(t, err) + + var r Reader + r, err = buf.Slice(n) + Equal(t, r.Len(), 0) + MustNil(t, err) + + err = buf.Release() + MustNil(t, err) + } +} + // cross-block operation test func TestLinkBufferIndex(t *testing.T) { // clean & new