Skip to content

Commit

Permalink
use net.Buffers to reduce memory allocations.
Browse files Browse the repository at this point in the history
  • Loading branch information
zdyj3170101136 committed Sep 18, 2024
1 parent a64f2cd commit d344f50
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 7 deletions.
10 changes: 4 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -94,6 +93,7 @@ func (c *Client) Close() error {
return errors.Wrap(err, "conn")
}

c.buf.Reset() // avoid memory leak.
return nil
}

Expand Down Expand Up @@ -269,15 +269,13 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
n, err := c.conn.Write(b.Buf)
b.Buffers = append(b.Buffers, b.Buf)
n, err := b.Buffers.WriteTo(c.conn)
if err != nil {
return errors.Wrap(err, "write")
}
if n != len(b.Buf) {
return errors.Wrap(io.ErrShortWrite, "wrote less than expected")
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
ce.Write(zap.Int("bytes", n))
ce.Write(zap.Int("bytes", int(n)))
}
b.Reset()
return nil
Expand Down
9 changes: 8 additions & 1 deletion proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ func (b Block) EncodeRawBlock(buf *Buffer, version int, input []InputColumn) err
if v, ok := col.Data.(StateEncoder); ok {
v.EncodeState(buf)
}
col.Data.EncodeColumn(buf)
if v, ok := col.Data.(*ColStr); ok {
// TODO SUPPORT OTHER COLUMN.
buf.Buffers = append(buf.Buffers, buf.Buf)
buf.Buf = nil
buf.Buffers = append(buf.Buffers, v.Buf)
} else {
col.Data.EncodeColumn(buf)
}
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions proto/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/binary"
"io"
"math"
"net"
)

// Buffer implements ClickHouse binary protocol encoding.
type Buffer struct {
Buf []byte
net.Buffers
}

// Reader returns new *Reader from *Buffer.
Expand Down Expand Up @@ -45,6 +47,7 @@ func (b *Buffer) Encode(e Encoder) {
// Reset buffer to zero length.
func (b *Buffer) Reset() {
b.Buf = b.Buf[:0]
b.Buffers = nil
}

// Read implements io.Reader.
Expand Down
2 changes: 2 additions & 0 deletions proto/col_str.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ColStr struct {

// Append string to column.
func (c *ColStr) Append(v string) {
c.Buf = binary.AppendUvarint(c.Buf, uint64(len(v)))
start := len(c.Buf)
c.Buf = append(c.Buf, v...)
end := len(c.Buf)
Expand All @@ -29,6 +30,7 @@ func (c *ColStr) Append(v string) {

// AppendBytes append byte slice as string to column.
func (c *ColStr) AppendBytes(v []byte) {
c.Buf = binary.AppendUvarint(c.Buf, uint64(len(v)))
start := len(c.Buf)
c.Buf = append(c.Buf, v...)
end := len(c.Buf)
Expand Down
1 change: 1 addition & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
// Note: only blocks are compressed.
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
// TODO SUPPORT COMPRESS
data := c.buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
return errors.Wrap(err, "compress")
Expand Down

0 comments on commit d344f50

Please sign in to comment.