Skip to content

Commit

Permalink
feat(ch): use new writer API to encode blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Aug 16, 2024
1 parent d8624c3 commit 825df30
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 26 deletions.
22 changes: 10 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ import (
// Client implements ClickHouse binary protocol client on top of
// single TCP connection.
type Client struct {
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
vec net.Buffers
useWritev bool
reader *proto.Reader
info proto.ClientHello
server proto.ServerHello
version clientVersion
quotaKey string
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
writer *proto.Writer
reader *proto.Reader
info proto.ClientHello
server proto.ServerHello
version clientVersion
quotaKey string

mux sync.Mutex
closed bool
Expand Down Expand Up @@ -503,8 +502,7 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
}
if _, ok := conn.(*net.TCPConn); writevAvailable && // writev available only on Unix platforms.
ok && c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression.
c.useWritev = true
c.vec = make(net.Buffers, 0, 16)
c.writer = proto.NewWriter(c.conn, c.buf)
}

handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout)
Expand Down
20 changes: 6 additions & 14 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,13 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
BucketNum: -1,
}
}
if c.useWritev {
bw := &proto.BlockWriter{
Buf: c.buf,
Vec: append(c.vec[:0], c.buf.Buf),
if w := c.writer; w != nil {
if err := b.WriteBlock(w, c.protocolVersion, input); err != nil {
return err
}
if err := b.WriteBlock(bw, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "build writev block")
}
if err := c.flushWritev(ctx, bw.Vec); err != nil {
if err := c.flushWritev(ctx); err != nil {
return errors.Wrap(err, "write buffers")
}
c.buf.Reset()
} else {
if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "encode")
Expand All @@ -325,7 +320,7 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
return nil
}

func (c *Client) flushWritev(ctx context.Context, vec net.Buffers) error {
func (c *Client) flushWritev(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
Expand All @@ -336,10 +331,7 @@ func (c *Client) flushWritev(ctx context.Context, vec net.Buffers) error {
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
if _, err := vec.WriteTo(c.conn); err != nil {
return err
}
return nil
return c.writer.Flush()
}

// encodeBlankBlock encodes block with zero columns and rows which is special
Expand Down

0 comments on commit 825df30

Please sign in to comment.