diff --git a/client.go b/client.go index 535dbb48..050b4d34 100644 --- a/client.go +++ b/client.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "fmt" - "io" "net" "strconv" "strings" @@ -94,6 +93,7 @@ func (c *Client) Close() error { return errors.Wrap(err, "conn") } + c.buf.Reset() // avoid memory leak. return nil } @@ -269,15 +269,13 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error { // Reset deadline. defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }() } - n, err := c.conn.Write(b.Buf) + b.Buffers = append(b.Buffers, b.Buf) + n, err := b.Buffers.WriteTo(c.conn) if err != nil { return errors.Wrap(err, "write") } - if n != len(b.Buf) { - return errors.Wrap(io.ErrShortWrite, "wrote less than expected") - } if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil { - ce.Write(zap.Int("bytes", n)) + ce.Write(zap.Int("bytes", int(n))) } b.Reset() return nil diff --git a/proto/block.go b/proto/block.go index 34548067..b0acfab9 100644 --- a/proto/block.go +++ b/proto/block.go @@ -178,7 +178,14 @@ func (b Block) EncodeRawBlock(buf *Buffer, version int, input []InputColumn) err if v, ok := col.Data.(StateEncoder); ok { v.EncodeState(buf) } - col.Data.EncodeColumn(buf) + if v, ok := col.Data.(*ColStr); ok { + // TODO SUPPORT OTHER COLUMN. + buf.Buffers = append(buf.Buffers, buf.Buf) + buf.Buf = nil + buf.Buffers = append(buf.Buffers, v.Buf) + } else { + col.Data.EncodeColumn(buf) + } } return nil } diff --git a/proto/buffer.go b/proto/buffer.go index e2b1e8f1..370f4f96 100644 --- a/proto/buffer.go +++ b/proto/buffer.go @@ -5,11 +5,13 @@ import ( "encoding/binary" "io" "math" + "net" ) // Buffer implements ClickHouse binary protocol encoding. type Buffer struct { Buf []byte + net.Buffers } // Reader returns new *Reader from *Buffer. @@ -45,6 +47,7 @@ func (b *Buffer) Encode(e Encoder) { // Reset buffer to zero length. func (b *Buffer) Reset() { b.Buf = b.Buf[:0] + b.Buffers = nil } // Read implements io.Reader. diff --git a/proto/col_str.go b/proto/col_str.go index 8f48ad7f..574cb047 100644 --- a/proto/col_str.go +++ b/proto/col_str.go @@ -21,6 +21,7 @@ type ColStr struct { // Append string to column. func (c *ColStr) Append(v string) { + c.Buf = binary.AppendUvarint(c.Buf, uint64(len(v))) start := len(c.Buf) c.Buf = append(c.Buf, v...) end := len(c.Buf) @@ -29,6 +30,7 @@ func (c *ColStr) Append(v string) { // AppendBytes append byte slice as string to column. func (c *ColStr) AppendBytes(v []byte) { + c.Buf = binary.AppendUvarint(c.Buf, uint64(len(v))) start := len(c.Buf) c.Buf = append(c.Buf, v...) end := len(c.Buf) diff --git a/query.go b/query.go index a3dcd17b..762b2c98 100644 --- a/query.go +++ b/query.go @@ -301,6 +301,7 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot // Note: only blocks are compressed. // See "Compressible" method of server or client code for reference. if c.compression == proto.CompressionEnabled { + // TODO SUPPORT COMPRESS data := c.buf.Buf[start:] if err := c.compressor.Compress(c.compressionMethod, data); err != nil { return errors.Wrap(err, "compress")