Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ch): use writev when possible #413

Merged
merged 11 commits into from
Sep 25, 2024
72 changes: 50 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type Client struct {
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
writer *proto.Writer
reader *proto.Reader
info proto.ClientHello
server proto.ServerHello
Expand Down Expand Up @@ -276,19 +276,38 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
if n != len(b.Buf) {
return errors.Wrap(io.ErrShortWrite, "wrote less than expected")
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
if ce := c.lg.Check(zap.DebugLevel, "Buffer flush"); ce != nil {
ce.Write(zap.Int("bytes", n))
}
b.Reset()
return nil
}

func (c *Client) flush(ctx context.Context) error {
return c.flushBuf(ctx, c.buf)
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
n, err := c.writer.Flush()
if err != nil {
return err
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
ce.Write(zap.Int64("bytes", n))
}
return nil
}

func (c *Client) encode(v proto.AwareEncoder) {
v.EncodeAware(c.buf, c.protocolVersion)
c.writer.ChainBuffer(func(b *proto.Buffer) {
v.EncodeAware(b, c.protocolVersion)
})
}

//go:generate go run github.com/dmarkham/enumer -transform upper -type Compression -trimprefix Compression -output compression_enum.go
Expand Down Expand Up @@ -451,9 +470,32 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
ctx = newCtx
defer span.End()
}

var (
compressor = compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel))
compression proto.Compression
compressionMethod compress.Method
)
switch opt.Compression {
case CompressionLZ4:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4
case CompressionLZ4HC:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4HC
case CompressionZSTD:
compression = proto.CompressionEnabled
compressionMethod = compress.ZSTD
case CompressionNone:
compression = proto.CompressionEnabled
compressionMethod = compress.None
default:
compression = proto.CompressionDisabled
}

c := &Client{
conn: conn,
buf: new(proto.Buffer),
writer: proto.NewWriter(conn, new(proto.Buffer)),
reader: proto.NewReader(conn),
settings: opt.Settings,
lg: opt.Logger,
Expand All @@ -464,7 +506,9 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {

readTimeout: opt.ReadTimeout,

compressor: compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel)),
compression: compression,
compressionMethod: compressionMethod,
compressor: compressor,

version: ver,
protocolVersion: opt.ProtocolVersion,
Expand All @@ -480,22 +524,6 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
Password: opt.Password,
},
}
switch opt.Compression {
case CompressionLZ4:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4
case CompressionLZ4HC:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4HC
case CompressionZSTD:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.ZSTD
case CompressionNone:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.None
default:
c.compression = proto.CompressionDisabled
}

handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout)
defer cancel()
Expand Down
9 changes: 6 additions & 3 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

func (c *Client) encodeAddendum() {
if proto.FeatureQuotaKey.In(c.protocolVersion) {
c.buf.PutString(c.quotaKey)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.PutString(c.quotaKey)
})
}
}

Expand Down Expand Up @@ -45,8 +47,9 @@ func (c *Client) handshake(ctx context.Context) error {
wg.Go(func() error {
defer cancel()

c.buf.Reset()
c.info.Encode(c.buf)
c.writer.ChainBuffer(func(b *proto.Buffer) {
c.info.Encode(b)
})
if err := c.flush(wgCtx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
66 changes: 66 additions & 0 deletions insert_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ch

import (
"context"
"fmt"
"testing"

"github.com/go-faster/errors"

"github.com/ClickHouse/ch-go/cht"
"github.com/ClickHouse/ch-go/proto"
)

func BenchmarkInsert(b *testing.B) {
cht.Skip(b)
srv := cht.New(b)

bench := func(rows int) func(b *testing.B) {
return func(b *testing.B) {
ctx := context.Background()
c, err := Dial(ctx, Options{
Address: srv.TCP,
Compression: CompressionDisabled,
})
if err != nil {
b.Fatal(errors.Wrap(err, "dial"))
}
defer func() { _ = c.Close() }()

if err := c.Do(ctx, Query{
Body: "CREATE TABLE IF NOT EXISTS test_table (id Int64) ENGINE = Null",
}); err != nil {
b.Fatal(err)
}

var id proto.ColInt64
for i := 0; i < rows; i++ {
id = append(id, 1)
}

b.SetBytes(int64(rows) * 8)
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
if err := c.Do(ctx, Query{
Body: "INSERT INTO test_table VALUES",
Input: []proto.InputColumn{
{Name: "id", Data: id},
},
}); err != nil {
b.Fatal()
}
}
}
}
for _, rows := range []int{
10_000,
100_000,
1_000_000,
10_000_000,
100_000_000,
} {
b.Run(fmt.Sprintf("Rows%d", rows), bench(rows))
}
}
4 changes: 3 additions & 1 deletion ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func (c *Client) Ping(ctx context.Context) (err error) {
span.End()
}()
}
c.buf.Encode(proto.ClientCodePing)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.Encode(proto.ClientCodePing)
})
if err := c.flush(ctx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
32 changes: 32 additions & 0 deletions proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,38 @@ func (b Block) EncodeRawBlock(buf *Buffer, version int, input []InputColumn) err
return nil
}

func (b Block) WriteBlock(w *Writer, version int, input []InputColumn) error {
w.ChainBuffer(func(buf *Buffer) {
if FeatureBlockInfo.In(version) {
b.Info.Encode(buf)
}
buf.PutInt(b.Columns)
buf.PutInt(b.Rows)
})

for _, col := range input {
if r := col.Data.Rows(); r != b.Rows {
return errors.Errorf("%q has %d rows, expected %d", col.Name, r, b.Rows)
}
w.ChainBuffer(func(buf *Buffer) {
col.EncodeStart(buf, version)
})
if v, ok := col.Data.(Preparable); ok {
if err := v.Prepare(); err != nil {
return errors.Wrapf(err, "prepare %q", col.Name)
}
}
if col.Data.Rows() == 0 {
continue
}
if v, ok := col.Data.(StateEncoder); ok {
w.ChainBuffer(v.EncodeState)
}
col.Data.WriteColumn(w)
}
return nil
}

// This constrains can prevent accidental OOM and allow early detection
// of erroneous column or row count.
//
Expand Down
8 changes: 8 additions & 0 deletions proto/cmd/ch-gen-col/safe.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ func (c {{ .Type }}) EncodeColumn(b *Buffer) {
}
{{- end }}
}

func (c {{ .Type }}) WriteColumn(w *Writer) {
{{- if .Byte }}
w.ChainWrite([]byte(c))
{{- else }}
w.ChainBuffer(c.EncodeColumn)
{{- end }}
}
1 change: 1 addition & 0 deletions proto/cmd/ch-gen-col/test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func Test{{ .Type }}_DecodeColumn(t *testing.T) {
var v {{ .Type }}
v.EncodeColumn(nil) // should be no-op
})
t.Run("WriteColumn", checkWriteColumn(data))
}

{{- if not .Time }}
Expand Down
29 changes: 29 additions & 0 deletions proto/cmd/ch-gen-col/unsafe.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,32 @@ func (c {{ .Type }}) EncodeColumn(b *Buffer) {
dst := b.Buf[offset:]
copy(dst, src)
}

func (c {{ .Type }}) WriteColumn(w *Writer) {
{{- if .DateTime }}
v := c.Data
{{- else }}
v := c
{{- end }}
if len(v) == 0 {
return
}
{{- if .SingleByte }}
src := *(*[]byte)(unsafe.Pointer(&v))
{{- else }}
{{- if .FixedStr }}
const size = {{ .Bytes }}
{{- else }}
const size = {{ .Bits }} / 8
{{- end }}

s := *(*slice)(unsafe.Pointer(&v))
{{- if not .SingleByte }}
s.Len *= size
s.Cap *= size
{{- end }}

src := *(*[]byte)(unsafe.Pointer(&s))
{{- end }}
w.ChainWrite(src)
}
6 changes: 6 additions & 0 deletions proto/col_arr.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func (c ColArr[T]) EncodeColumn(b *Buffer) {
c.Data.EncodeColumn(b)
}

// WriteColumn implements ColInput.
func (c ColArr[T]) WriteColumn(w *Writer) {
c.Offsets.WriteColumn(w)
c.Data.WriteColumn(w)
}

// Append appends new row to column.
func (c *ColArr[T]) Append(v []T) {
c.Data.AppendArr(v)
Expand Down
3 changes: 3 additions & 0 deletions proto/col_arr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func testColumn[T any](t *testing.T, name string, f func() ColumnOf[T], values .
t.Run("Golden", func(t *testing.T) {
gold.Bytes(t, buf.Buf, "column_of_"+name)
})
t.Run("WriteColumn", checkWriteColumn(data))
t.Run("Ok", func(t *testing.T) {
br := bytes.NewReader(buf.Buf)
r := NewReader(br)
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestColArrOfStr(t *testing.T) {
dec := (&ColStr{}).Array()
requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows()))
})
t.Run("WriteColumn", checkWriteColumn(col))
}

func TestArrOfLowCordStr(t *testing.T) {
Expand Down Expand Up @@ -118,6 +120,7 @@ func TestArrOfLowCordStr(t *testing.T) {
dec := NewArray[string](new(ColStr).LowCardinality())
requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows()))
})
t.Run("WriteColumn", checkWriteColumn(col))
}

func TestColArr_DecodeColumn(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions proto/col_auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ func (c ColAuto) Reset() {
func (c ColAuto) EncodeColumn(b *Buffer) {
c.Data.EncodeColumn(b)
}

func (c ColAuto) WriteColumn(w *Writer) {
c.Data.WriteColumn(w)
}
8 changes: 8 additions & 0 deletions proto/col_bool_safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ func (c *ColBool) DecodeColumn(r *Reader, rows int) error {
*c = v
return nil
}

// WriteColumn encodes ColBool rows to *Writer.
func (c ColBool) WriteColumn(w *Writer) {
if len(c) == 0 {
return
}
w.ChainBuffer(c.EncodeColumn)
}
1 change: 1 addition & 0 deletions proto/col_bool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestColBool_DecodeColumn(t *testing.T) {
var dec ColBool
requireNoShortRead(t, buf.Buf, colAware(&dec, rows))
})
t.Run("WriteColumn", checkWriteColumn(data))
}

func BenchmarkColBool_DecodeColumn(b *testing.B) {
Expand Down
10 changes: 10 additions & 0 deletions proto/col_bool_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ func (c *ColBool) DecodeColumn(r *Reader, rows int) error {
}
return nil
}

// WriteColumn writes Bool rows to *Writer.
func (c ColBool) WriteColumn(w *Writer) {
if len(c) == 0 {
return
}
s := *(*slice)(unsafe.Pointer(&c)) // #nosec G103
src := *(*[]byte)(unsafe.Pointer(&s)) // #nosec G103
w.ChainWrite(src)
}
1 change: 1 addition & 0 deletions proto/col_date32_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading