diff --git a/client.go b/client.go index a8f0e2f7..e450caf4 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,7 @@ type Client struct { lg *zap.Logger conn net.Conn buf *proto.Buffer + writer *proto.Writer reader *proto.Reader info proto.ClientHello server proto.ServerHello @@ -500,6 +501,11 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) { c.compression = proto.CompressionDisabled } + if _, ok := conn.(*net.TCPConn); writevAvailable && // writev available only on Unix platforms. + ok && c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression. + c.writer = proto.NewWriter(c.conn, c.buf) + } + handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout) defer cancel() if err := c.handshake(handshakeCtx); err != nil { diff --git a/query.go b/query.go index a3dcd17b..be165b83 100644 --- a/query.go +++ b/query.go @@ -292,8 +292,18 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot BucketNum: -1, } } - if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil { - return errors.Wrap(err, "encode") + + if w := c.writer; w != nil { + if err := b.WriteBlock(w, c.protocolVersion, input); err != nil { + return err + } + if err := c.flushWritev(ctx); err != nil { + return errors.Wrap(err, "write buffers") + } + } else { + if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil { + return errors.Wrap(err, "encode") + } } // Performing compression. @@ -311,6 +321,20 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot return nil } +func (c *Client) flushWritev(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "context") + } + if deadline, ok := ctx.Deadline(); ok { + if err := c.conn.SetWriteDeadline(deadline); err != nil { + return errors.Wrap(err, "set write deadline") + } + // Reset deadline. + defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }() + } + return c.writer.Flush() +} + // encodeBlankBlock encodes block with zero columns and rows which is special // case for "end of data". func (c *Client) encodeBlankBlock(ctx context.Context) error { diff --git a/writev_other.go b/writev_other.go new file mode 100644 index 00000000..ed60c214 --- /dev/null +++ b/writev_other.go @@ -0,0 +1,5 @@ +//go:build !unix + +package ch + +const writevAvailable = false diff --git a/writev_unix.go b/writev_unix.go new file mode 100644 index 00000000..3e80a2e7 --- /dev/null +++ b/writev_unix.go @@ -0,0 +1,5 @@ +//go:build unix + +package ch + +const writevAvailable = true