diff --git a/proto/col_arr.go b/proto/col_arr.go index ad574368..9b7639bf 100644 --- a/proto/col_arr.go +++ b/proto/col_arr.go @@ -130,6 +130,12 @@ func (c ColArr[T]) EncodeColumn(b *Buffer) { c.Data.EncodeColumn(b) } +// WriteColumn implements ColInput. +func (c ColArr[T]) WriteColumn(w *Writer) { + c.Offsets.WriteColumn(w) + c.Data.WriteColumn(w) +} + // Append appends new row to column. func (c *ColArr[T]) Append(v []T) { c.Data.AppendArr(v) diff --git a/proto/col_arr_test.go b/proto/col_arr_test.go index c4517f37..081a774d 100644 --- a/proto/col_arr_test.go +++ b/proto/col_arr_test.go @@ -23,6 +23,7 @@ func testColumn[T any](t *testing.T, name string, f func() ColumnOf[T], values . t.Run("Golden", func(t *testing.T) { gold.Bytes(t, buf.Buf, "column_of_"+name) }) + t.Run("WriteColumn", checkWriteColumn(data)) t.Run("Ok", func(t *testing.T) { br := bytes.NewReader(buf.Buf) r := NewReader(br) @@ -84,6 +85,7 @@ func TestColArrOfStr(t *testing.T) { dec := (&ColStr{}).Array() requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows())) }) + t.Run("WriteColumn", checkWriteColumn(col)) } func TestArrOfLowCordStr(t *testing.T) { @@ -118,6 +120,7 @@ func TestArrOfLowCordStr(t *testing.T) { dec := NewArray[string](new(ColStr).LowCardinality()) requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows())) }) + t.Run("WriteColumn", checkWriteColumn(col)) } func TestColArr_DecodeColumn(t *testing.T) { diff --git a/proto/col_auto.go b/proto/col_auto.go index 2ed9b8ea..bda616be 100644 --- a/proto/col_auto.go +++ b/proto/col_auto.go @@ -122,3 +122,7 @@ func (c ColAuto) Reset() { func (c ColAuto) EncodeColumn(b *Buffer) { c.Data.EncodeColumn(b) } + +func (c ColAuto) WriteColumn(w *Writer) { + c.Data.WriteColumn(w) +} diff --git a/proto/col_bool_test.go b/proto/col_bool_test.go index fba9716e..3ae07962 100644 --- a/proto/col_bool_test.go +++ b/proto/col_bool_test.go @@ -44,31 +44,7 @@ func TestColBool_DecodeColumn(t *testing.T) { var dec ColBool requireNoShortRead(t, buf.Buf, colAware(&dec, rows)) }) -} - -func TestColBool_byteSpan(t *testing.T) { - t.Parallel() - - const rows = 50 - var data ColBool - br, ok := any(data).(byteSpan) - if !ok || !br.isByteSpan() { - t.Skipf("%T could not be written as-is", data) - return - } - - for i := 0; i < rows; i++ { - data = append(data, (i%3) == 0) - } - - var expect Buffer - data.EncodeColumn(&expect) - - var got Buffer - for _, part := range any(data).(byteSpan).appendSlice(nil) { - got.Buf = append(got.Buf, part...) - } - require.Equal(t, expect.Buf, got.Buf) + t.Run("WriteColumn", checkWriteColumn(data)) } func BenchmarkColBool_DecodeColumn(b *testing.B) { diff --git a/proto/col_bool_unsafe.go b/proto/col_bool_unsafe.go index e1bf01ae..c42966ea 100644 --- a/proto/col_bool_unsafe.go +++ b/proto/col_bool_unsafe.go @@ -3,7 +3,6 @@ package proto import ( - "net" "unsafe" "github.com/go-faster/errors" @@ -36,12 +35,12 @@ func (c *ColBool) DecodeColumn(r *Reader, rows int) error { return nil } -func (ColBool) isByteSpan() bool { return true } - -func (c ColBool) appendSlice(buf net.Buffers) net.Buffers { +// WriteColumn writes Bool rows to *Writer. +func (c ColBool) WriteColumn(w *Writer) { if len(c) == 0 { - return buf + return } - src := *(*[]byte)(unsafe.Pointer(&c)) // #nosec G103 - return append(buf, src) + s := *(*slice)(unsafe.Pointer(&c)) // #nosec G103 + src := *(*[]byte)(unsafe.Pointer(&s)) // #nosec G103 + w.ChainWrite(src) } diff --git a/proto/col_enum.go b/proto/col_enum.go index f4af963b..a4b4f249 100644 --- a/proto/col_enum.go +++ b/proto/col_enum.go @@ -169,4 +169,8 @@ func (e *ColEnum) EncodeColumn(b *Buffer) { e.raw().EncodeColumn(b) } +func (e *ColEnum) WriteColumn(w *Writer) { + e.raw().WriteColumn(w) +} + func (e *ColEnum) Type() ColumnType { return e.t } diff --git a/proto/col_fixed_str.go b/proto/col_fixed_str.go index cb9f5510..7ae816e6 100644 --- a/proto/col_fixed_str.go +++ b/proto/col_fixed_str.go @@ -1,7 +1,6 @@ package proto import ( - "net" "strconv" "github.com/go-faster/errors" @@ -87,13 +86,9 @@ func (c *ColFixedStr) DecodeColumn(r *Reader, rows int) error { return nil } -func (ColFixedStr) isByteSpan() bool { return true } - -func (c ColFixedStr) appendSlice(buf net.Buffers) net.Buffers { - if len(c.Buf) == 0 { - return buf - } - return append(buf, c.Buf) +// WriteColumn writes ColFixedStr rows to *Writer. +func (c ColFixedStr) WriteColumn(w *Writer) { + w.ChainWrite(c.Buf) } // Array returns new Array(FixedString). diff --git a/proto/col_fixed_str_test.go b/proto/col_fixed_str_test.go index 549c99b3..4f072afe 100644 --- a/proto/col_fixed_str_test.go +++ b/proto/col_fixed_str_test.go @@ -86,32 +86,7 @@ func TestColFixedStr_EncodeColumn(t *testing.T) { require.Equal(t, 10, v.Size) }) }) -} - -func TestColFixedStr_byteSpan(t *testing.T) { - t.Parallel() - - var data ColFixedStr - for _, s := range []string{ - "foo", - "bar", - "ClickHouse", - "one", - "", - "1", - } { - h := sha256.Sum256([]byte(s)) - data.Append(h[:]) - } - - var expect Buffer - data.EncodeColumn(&expect) - - var got Buffer - for _, part := range any(data).(byteSpan).appendSlice(nil) { - got.Buf = append(got.Buf, part...) - } - require.Equal(t, expect.Buf, got.Buf) + t.Run("WriteColumn", checkWriteColumn(data)) } func BenchmarkColFixedStr_DecodeColumn(b *testing.B) { diff --git a/proto/col_interval.go b/proto/col_interval.go index 57bb2e39..c9ecb4c4 100644 --- a/proto/col_interval.go +++ b/proto/col_interval.go @@ -110,3 +110,7 @@ func (c *ColInterval) Reset() { func (c ColInterval) EncodeColumn(b *Buffer) { c.Values.EncodeColumn(b) } + +func (c ColInterval) WriteColumn(w *Writer) { + c.Values.WriteColumn(w) +} diff --git a/proto/col_interval_test.go b/proto/col_interval_test.go index 6972ff86..3d10bce1 100644 --- a/proto/col_interval_test.go +++ b/proto/col_interval_test.go @@ -132,4 +132,5 @@ func TestColInterval(t *testing.T) { var v ColInterval v.EncodeColumn(nil) // should be no-op }) + t.Run("WriteColumn", checkWriteColumn(data)) } diff --git a/proto/col_low_cardinality.go b/proto/col_low_cardinality.go index ffed5809..dd70f67a 100644 --- a/proto/col_low_cardinality.go +++ b/proto/col_low_cardinality.go @@ -230,6 +230,41 @@ func (c *ColLowCardinality[T]) EncodeColumn(b *Buffer) { } } +func (c *ColLowCardinality[T]) WriteColumn(w *Writer) { + // Using pointer receiver as Prepare() is expected to be called before + // encoding. + + if c.Rows() == 0 { + // Skipping encoding entirely. + return + } + + w.ChainBuffer(func(b *Buffer) { + // Meta encodes whether reader should update + // low cardinality metadata and keys column type. + meta := cardinalityUpdateAll | int64(c.key) + b.PutInt64(meta) + + // Writing index (dictionary). + b.PutInt64(int64(c.index.Rows())) + }) + c.index.WriteColumn(w) + + w.ChainBuffer(func(b *Buffer) { + b.PutInt64(int64(c.Rows())) + }) + switch c.key { + case KeyUInt8: + c.keys8.WriteColumn(w) + case KeyUInt16: + c.keys16.WriteColumn(w) + case KeyUInt32: + c.keys32.WriteColumn(w) + case KeyUInt64: + c.keys64.WriteColumn(w) + } +} + func (c *ColLowCardinality[T]) Reset() { for k := range c.kv { delete(c.kv, k) diff --git a/proto/col_low_cardinality_raw.go b/proto/col_low_cardinality_raw.go index 665dc20c..99286345 100644 --- a/proto/col_low_cardinality_raw.go +++ b/proto/col_low_cardinality_raw.go @@ -155,3 +155,28 @@ func (c ColLowCardinalityRaw) EncodeColumn(b *Buffer) { b.PutInt64(int64(k.Rows())) k.EncodeColumn(b) } + +func (c ColLowCardinalityRaw) WriteColumn(w *Writer) { + if c.Rows() == 0 { + // Skipping encoding entirely. + return + } + + w.ChainBuffer(func(b *Buffer) { + // Meta encodes whether reader should update + // low cardinality metadata and keys column type. + meta := cardinalityUpdateAll | int64(c.Key) + b.PutInt64(meta) + + // Writing index (dictionary). + b.PutInt64(int64(c.Index.Rows())) + }) + c.Index.WriteColumn(w) + + // Sequence of values as indexes in dictionary. + k := c.Keys() + w.ChainBuffer(func(b *Buffer) { + b.PutInt64(int64(k.Rows())) + }) + k.WriteColumn(w) +} diff --git a/proto/col_low_cardinality_raw_test.go b/proto/col_low_cardinality_raw_test.go index 00416653..6f04a2dc 100644 --- a/proto/col_low_cardinality_raw_test.go +++ b/proto/col_low_cardinality_raw_test.go @@ -60,6 +60,7 @@ func TestColLowCardinalityRaw_DecodeColumn(t *testing.T) { } requireNoShortRead(t, buf.Buf, colAware(dec, rows)) }) + t.Run("WriteColumn", checkWriteColumn(col)) }) t.Run("Blank", func(t *testing.T) { // Blank columns (i.e. row count is zero) are not encoded. diff --git a/proto/col_low_cardinality_test.go b/proto/col_low_cardinality_test.go index 43ec93ed..a432d188 100644 --- a/proto/col_low_cardinality_test.go +++ b/proto/col_low_cardinality_test.go @@ -93,6 +93,7 @@ func TestArrLowCardinalityStr(t *testing.T) { dec := new(ColStr).LowCardinality().Array() requireNoShortRead(t, buf.Buf, colAware(dec, rows)) }) + t.Run("WriteColumn", checkWriteColumn(col)) } func TestColLowCardinality_DecodeColumn(t *testing.T) { diff --git a/proto/col_map.go b/proto/col_map.go index 90925fb2..956a9a8b 100644 --- a/proto/col_map.go +++ b/proto/col_map.go @@ -164,6 +164,16 @@ func (c ColMap[K, V]) EncodeColumn(b *Buffer) { c.Values.EncodeColumn(b) } +func (c ColMap[K, V]) WriteColumn(w *Writer) { + if c.Rows() == 0 { + return + } + + c.Offsets.WriteColumn(w) + c.Keys.WriteColumn(w) + c.Values.WriteColumn(w) +} + // Prepare ensures Preparable column propagation. func (c ColMap[K, V]) Prepare() error { if v, ok := c.Keys.(Preparable); ok { diff --git a/proto/col_nothing.go b/proto/col_nothing.go index 1a825091..d72eeba6 100644 --- a/proto/col_nothing.go +++ b/proto/col_nothing.go @@ -71,3 +71,7 @@ func (c ColNothing) EncodeColumn(b *Buffer) { } b.PutRaw(make([]byte, c)) } + +func (c ColNothing) WriteColumn(w *Writer) { + w.ChainBuffer(c.EncodeColumn) +} diff --git a/proto/col_nothing_test.go b/proto/col_nothing_test.go index 380c555f..0d2b722c 100644 --- a/proto/col_nothing_test.go +++ b/proto/col_nothing_test.go @@ -61,4 +61,5 @@ func TestColNothing(t *testing.T) { var v ColNothing v.EncodeColumn(nil) // should be no-op }) + t.Run("WriteColumn", checkWriteColumn(ColNothing(0))) } diff --git a/proto/col_nullable.go b/proto/col_nullable.go index 516245f0..dfac8fa2 100644 --- a/proto/col_nullable.go +++ b/proto/col_nullable.go @@ -127,6 +127,11 @@ func (c ColNullable[T]) EncodeColumn(b *Buffer) { c.Values.EncodeColumn(b) } +func (c ColNullable[T]) WriteColumn(w *Writer) { + c.Nulls.WriteColumn(w) + c.Values.WriteColumn(w) +} + func (c ColNullable[T]) IsElemNull(i int) bool { if i < c.Rows() { return c.Nulls[i] == boolTrue diff --git a/proto/col_point.go b/proto/col_point.go index 0e1549ff..5d7834f8 100644 --- a/proto/col_point.go +++ b/proto/col_point.go @@ -61,3 +61,8 @@ func (c ColPoint) EncodeColumn(b *Buffer) { c.X.EncodeColumn(b) c.Y.EncodeColumn(b) } + +func (c ColPoint) WriteColumn(w *Writer) { + c.X.WriteColumn(w) + c.Y.WriteColumn(w) +} diff --git a/proto/col_raw_of.go b/proto/col_raw_of.go index 325a17b3..d56e3571 100644 --- a/proto/col_raw_of.go +++ b/proto/col_raw_of.go @@ -82,3 +82,17 @@ func (c *ColRawOf[X]) DecodeColumn(r *Reader, rows int) error { } return nil } + +// WriteColumn write ColRawOf rows to *Writer. +func (c ColRawOf[X]) WriteColumn(w *Writer) { + if len(c) == 0 { + return + } + var x X + size := unsafe.Sizeof(x) // #nosec G103 + s := *(*slice)(unsafe.Pointer(&c)) // #nosec G103 + s.Len *= size + s.Cap *= size + src := *(*[]byte)(unsafe.Pointer(&s)) // #nosec G103 + w.ChainWrite(src) +} diff --git a/proto/col_str.go b/proto/col_str.go index 8f48ad7f..108c08e9 100644 --- a/proto/col_str.go +++ b/proto/col_str.go @@ -76,6 +76,18 @@ func (c ColStr) EncodeColumn(b *Buffer) { } } +// WriteColumn writes String rows to *Writer. +func (c ColStr) WriteColumn(w *Writer) { + buf := make([]byte, binary.MaxVarintLen64) + for _, p := range c.Pos { + w.ChainBuffer(func(b *Buffer) { + n := binary.PutUvarint(buf, uint64(p.End-p.Start)) + b.PutRaw(buf[:n]) + }) + w.ChainWrite(c.Buf[p.Start:p.End]) + } +} + // ForEach calls f on each string from column. func (c ColStr) ForEach(f func(i int, s string) error) error { return c.ForEachBytes(func(i int, b []byte) error { diff --git a/proto/col_tuple.go b/proto/col_tuple.go index ac7ad6c8..7ee1bef0 100644 --- a/proto/col_tuple.go +++ b/proto/col_tuple.go @@ -167,3 +167,9 @@ func (c ColTuple) EncodeColumn(b *Buffer) { v.EncodeColumn(b) } } + +func (c ColTuple) WriteColumn(w *Writer) { + for _, v := range c { + v.WriteColumn(w) + } +} diff --git a/proto/col_uuid_unsafe.go b/proto/col_uuid_unsafe.go index 18fa73fc..877bc4d4 100644 --- a/proto/col_uuid_unsafe.go +++ b/proto/col_uuid_unsafe.go @@ -47,3 +47,12 @@ func (c ColUUID) EncodeColumn(b *Buffer) { copy(dst, src) bswap.Swap64(dst) // BE <-> LE } + +// WriteColumn encodes ColUUID rows to *Writer. +func (c ColUUID) WriteColumn(w *Writer) { + if len(c) == 0 { + return + } + // Can't write UUID as-is: bswap is required. + w.ChainBuffer(c.EncodeColumn) +}