diff --git a/chcol.go b/chcol.go index 5b4fd0f127..9d2d339d3f 100644 --- a/chcol.go +++ b/chcol.go @@ -24,6 +24,7 @@ import "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" type ( Variant = chcol.Variant Dynamic = chcol.Dynamic + JSON = chcol.JSON ) // NewVariant creates a new Variant with the given value @@ -45,3 +46,8 @@ func NewDynamic(v any) Dynamic { func NewDynamicWithType(v any, chType string) Dynamic { return chcol.NewDynamicWithType(v, chType) } + +// NewJSON creates a new empty JSON value +func NewJSON() *JSON { + return chcol.NewJSON() +} diff --git a/examples/clickhouse_api/json_paths.go b/examples/clickhouse_api/json_paths.go new file mode 100644 index 0000000000..b260f3bbc7 --- /dev/null +++ b/examples/clickhouse_api/json_paths.go @@ -0,0 +1,86 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "time" +) + +func JSONPathsExample() error { + ctx := context.Background() + + conn, err := GetNativeConnection(clickhouse.Settings{ + "allow_experimental_json_type": true, + }, nil, nil) + if err != nil { + return err + } + + if !CheckMinServerVersion(conn, 24, 9, 0) { + fmt.Print("unsupported clickhouse version for JSON type") + return nil + } + + err = conn.Exec(ctx, "DROP TABLE IF EXISTS go_json_example") + if err != nil { + return err + } + + err = conn.Exec(ctx, ` + CREATE TABLE go_json_example (product JSON) ENGINE=Memory + `) + if err != nil { + return err + } + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO go_json_example (product)") + if err != nil { + return err + } + + insertProduct := clickhouse.NewJSON() + insertProduct.SetValueAtPath("id", clickhouse.NewDynamicWithType(uint64(1234), "UInt64")) + insertProduct.SetValueAtPath("name", "Book") + insertProduct.SetValueAtPath("tags", []string{"library", "fiction"}) + insertProduct.SetValueAtPath("pricing.price", int64(750)) + insertProduct.SetValueAtPath("pricing.currency", "usd") + insertProduct.SetValueAtPath("metadata.region", "us") + insertProduct.SetValueAtPath("metadata.page_count", int64(852)) + insertProduct.SetValueAtPath("created_at", clickhouse.NewDynamicWithType(time.Now().UTC().Truncate(time.Millisecond), "DateTime64(3)")) + + if err = batch.Append(insertProduct); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + var selectedProduct clickhouse.JSON + + if err = conn.QueryRow(ctx, "SELECT product FROM go_json_example").Scan(&selectedProduct); err != nil { + return err + } + + fmt.Printf("inserted product: %+v\n", insertProduct) + fmt.Printf("selected product: %+v\n", selectedProduct) + return nil +} diff --git a/examples/clickhouse_api/json_strings.go b/examples/clickhouse_api/json_strings.go new file mode 100644 index 0000000000..8b0ad2bb32 --- /dev/null +++ b/examples/clickhouse_api/json_strings.go @@ -0,0 +1,81 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" +) + +func JSONStringExample() error { + ctx := context.Background() + + conn, err := GetNativeConnection(clickhouse.Settings{ + "allow_experimental_json_type": true, + "output_format_native_write_json_as_string": true, + }, nil, nil) + if err != nil { + return err + } + + if !CheckMinServerVersion(conn, 24, 9, 0) { + fmt.Print("unsupported clickhouse version for JSON type") + return nil + } + + err = conn.Exec(ctx, "DROP TABLE IF EXISTS go_json_example") + if err != nil { + return err + } + + err = conn.Exec(ctx, ` + CREATE TABLE go_json_example (product JSON) ENGINE=Memory + `) + if err != nil { + return err + } + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO go_json_example (product)") + if err != nil { + return err + } + + insertProductString := "{\"id\":1234,\"name\":\"Book\",\"tags\":[\"library\",\"fiction\"]," + + "\"pricing\":{\"price\":750,\"currency\":\"usd\"},\"metadata\":{\"page_count\":852,\"region\":\"us\"}," + + "\"created_at\":\"2024-12-19T11:20:04.146Z\"}" + + if err = batch.Append(insertProductString); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + var selectedProductString string + + if err = conn.QueryRow(ctx, "SELECT product FROM go_json_example").Scan(&selectedProductString); err != nil { + return err + } + + fmt.Printf("inserted product string: %s\n", insertProductString) + fmt.Printf("selected product string: %s\n", selectedProductString) + fmt.Printf("inserted product string matches selected product string: %t\n", insertProductString == selectedProductString) + return nil +} diff --git a/examples/clickhouse_api/json_structs.go b/examples/clickhouse_api/json_structs.go new file mode 100644 index 0000000000..cdfeabe067 --- /dev/null +++ b/examples/clickhouse_api/json_structs.go @@ -0,0 +1,121 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "encoding/json" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "time" +) + +type ProductPricing struct { + Price int64 `json:"price"` + Currency string `json:"currency"` +} + +type Product struct { + ID clickhouse.Dynamic `json:"id"` + Name string `json:"name"` + Tags []string `json:"tags"` + Pricing ProductPricing `json:"pricing"` + Metadata map[string]interface{} `json:"metadata"` + CreatedAt time.Time `json:"created_at" chType:"DateTime64(3)"` +} + +func NewExampleProduct() *Product { + return &Product{ + ID: clickhouse.NewDynamicWithType(uint64(1234), "UInt64"), + Name: "Book", + Tags: []string{"library", "fiction"}, + Pricing: ProductPricing{ + Price: 750, + Currency: "usd", + }, + Metadata: map[string]interface{}{ + "region": "us", + "page_count": int64(852), + }, + CreatedAt: time.Now().UTC().Truncate(time.Millisecond), + } +} + +func JSONStructExample() error { + ctx := context.Background() + + conn, err := GetNativeConnection(clickhouse.Settings{ + "allow_experimental_json_type": true, + }, nil, nil) + if err != nil { + return err + } + + if !CheckMinServerVersion(conn, 24, 9, 0) { + fmt.Print("unsupported clickhouse version for JSON type") + return nil + } + + err = conn.Exec(ctx, "DROP TABLE IF EXISTS go_json_example") + if err != nil { + return err + } + + err = conn.Exec(ctx, ` + CREATE TABLE go_json_example (product JSON) ENGINE=Memory + `) + if err != nil { + return err + } + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO go_json_example (product)") + if err != nil { + return err + } + + insertProduct := NewExampleProduct() + + if err = batch.Append(insertProduct); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + var selectedProduct Product + + if err = conn.QueryRow(ctx, "SELECT product FROM go_json_example").Scan(&selectedProduct); err != nil { + return err + } + + insertProductBytes, err := json.Marshal(insertProduct) + if err != nil { + return err + } + + selectedProductBytes, err := json.Marshal(&selectedProduct) + if err != nil { + return err + } + + fmt.Printf("inserted product: %s\n", string(insertProductBytes)) + fmt.Printf("selected product: %s\n", string(selectedProductBytes)) + fmt.Printf("inserted product matches selected product: %t\n", string(insertProductBytes) == string(selectedProductBytes)) + return nil +} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index d1b15637d9..e5ce00d3b5 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -217,3 +217,16 @@ func TestVariantExample(t *testing.T) { func TestDynamicExample(t *testing.T) { require.NoError(t, DynamicExample()) } + +func TestJSONPathsExample(t *testing.T) { + require.NoError(t, JSONPathsExample()) +} + +func TestJSONStructExample(t *testing.T) { + require.NoError(t, JSONStructExample()) +} + +func TestJSONStringExample(t *testing.T) { + t.Skip("client cannot receive JSON strings") + require.NoError(t, JSONStringExample()) +} diff --git a/examples/std/json_paths.go b/examples/std/json_paths.go new file mode 100644 index 0000000000..8b9218ad97 --- /dev/null +++ b/examples/std/json_paths.go @@ -0,0 +1,100 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package std + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "time" +) + +func JSONPathsExample() error { + ctx := context.Background() + + conn, err := GetStdOpenDBConnection(clickhouse.Native, nil, nil, nil) + if err != nil { + return err + } + + if !CheckMinServerVersion(conn, 24, 9, 0) { + fmt.Print("unsupported clickhouse version for JSON type") + return nil + } + + _, err = conn.ExecContext(ctx, "SET allow_experimental_json_type = 1") + if err != nil { + return err + } + + defer func() { + conn.Exec("DROP TABLE go_json_example") + }() + + _, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS go_json_example") + if err != nil { + return err + } + + _, err = conn.ExecContext(ctx, ` + CREATE TABLE go_json_example ( + product JSON + ) ENGINE = Memory + `) + if err != nil { + return err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + + batch, err := tx.PrepareContext(ctx, "INSERT INTO go_json_example (product)") + if err != nil { + return err + } + + insertProduct := clickhouse.NewJSON() + insertProduct.SetValueAtPath("id", clickhouse.NewDynamicWithType(uint64(1234), "UInt64")) + insertProduct.SetValueAtPath("name", "Book") + insertProduct.SetValueAtPath("tags", []string{"library", "fiction"}) + insertProduct.SetValueAtPath("pricing.price", int64(750)) + insertProduct.SetValueAtPath("pricing.currency", "usd") + insertProduct.SetValueAtPath("metadata.region", "us") + insertProduct.SetValueAtPath("metadata.page_count", int64(852)) + insertProduct.SetValueAtPath("created_at", clickhouse.NewDynamicWithType(time.Now().UTC().Truncate(time.Millisecond), "DateTime64(3)")) + + if _, err = batch.ExecContext(ctx, insertProduct); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + var selectedProduct clickhouse.JSON + + if err = conn.QueryRowContext(ctx, "SELECT product FROM go_json_example").Scan(&selectedProduct); err != nil { + return err + } + + fmt.Printf("inserted product: %+v\n", insertProduct) + fmt.Printf("selected product: %+v\n", selectedProduct) + return nil +} diff --git a/examples/std/json_strings.go b/examples/std/json_strings.go new file mode 100644 index 0000000000..674f729cf4 --- /dev/null +++ b/examples/std/json_strings.go @@ -0,0 +1,99 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package std + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" +) + +func JSONStringExample() error { + ctx := context.Background() + + conn, err := GetStdOpenDBConnection(clickhouse.Native, nil, nil, nil) + if err != nil { + return err + } + + if !CheckMinServerVersion(conn, 24, 9, 0) { + fmt.Print("unsupported clickhouse version for JSON type") + return nil + } + + _, err = conn.ExecContext(ctx, "SET allow_experimental_json_type = 1") + if err != nil { + return err + } + + _, err = conn.ExecContext(ctx, "SET output_format_native_write_json_as_string = 1") + if err != nil { + return err + } + + defer func() { + conn.Exec("DROP TABLE go_json_example") + }() + + _, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS go_json_example") + if err != nil { + return err + } + + _, err = conn.ExecContext(ctx, ` + CREATE TABLE go_json_example ( + product JSON + ) ENGINE = Memory + `) + if err != nil { + return err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + + batch, err := tx.PrepareContext(ctx, "INSERT INTO go_json_example (product)") + if err != nil { + return err + } + + insertProductString := "{\"id\":1234,\"name\":\"Book\",\"tags\":[\"library\",\"fiction\"]," + + "\"pricing\":{\"price\":750,\"currency\":\"usd\"},\"metadata\":{\"page_count\":852,\"region\":\"us\"}," + + "\"created_at\":\"2024-12-19T11:20:04.146Z\"}" + + if _, err = batch.ExecContext(ctx, insertProductString); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + var selectedProductString string + + if err = conn.QueryRowContext(ctx, "SELECT product FROM go_json_example").Scan(&selectedProductString); err != nil { + return err + } + + fmt.Printf("inserted product string: %s\n", insertProductString) + fmt.Printf("selected product string: %s\n", selectedProductString) + fmt.Printf("inserted product string matches selected product string: %t\n", insertProductString == selectedProductString) + return nil +} diff --git a/examples/std/main_test.go b/examples/std/main_test.go index c1995c46fe..20e5552e0e 100644 --- a/examples/std/main_test.go +++ b/examples/std/main_test.go @@ -156,3 +156,12 @@ func TestVariantExample(t *testing.T) { func TestDynamicExample(t *testing.T) { require.NoError(t, DynamicExample()) } + +func TestJSONPathsExample(t *testing.T) { + require.NoError(t, JSONPathsExample()) +} + +func TestJSONStringExample(t *testing.T) { + t.Skip("client cannot receive JSON strings") + require.NoError(t, JSONStringExample()) +} diff --git a/lib/chcol/json.go b/lib/chcol/json.go new file mode 100644 index 0000000000..a43dd23502 --- /dev/null +++ b/lib/chcol/json.go @@ -0,0 +1,100 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package chcol + +import ( + "database/sql/driver" + "encoding/json" + "fmt" + "strings" +) + +// JSON represents a ClickHouse JSON type that can hold multiple possible types +type JSON struct { + valuesByPath map[string]any +} + +// NewJSON creates a new empty JSON value +func NewJSON() *JSON { + return &JSON{ + valuesByPath: make(map[string]any), + } +} + +func (o *JSON) ValuesByPath() map[string]any { + return o.valuesByPath +} + +func (o *JSON) SetValueAtPath(path string, value any) { + o.valuesByPath[path] = value +} + +func (o *JSON) ValueAtPath(path string) (any, bool) { + value, ok := o.valuesByPath[path] + return value, ok +} + +// NestedMap converts the flattened JSON data into a nested structure +func (o *JSON) NestedMap() map[string]any { + nested := make(map[string]any) + + for key, value := range o.valuesByPath { + parts := strings.Split(key, ".") + current := nested + + for i := 0; i < len(parts)-1; i++ { + part := parts[i] + + if _, exists := current[part]; !exists { + current[part] = make(map[string]any) + } + + current = current[part].(map[string]any) + } + + current[parts[len(parts)-1]] = value + } + + return nested +} + +// MarshalJSON implements the json.Marshaler interface +func (o JSON) MarshalJSON() ([]byte, error) { + return json.Marshal(o.NestedMap()) +} + +// Scan implements the sql.Scanner interface +func (o *JSON) Scan(value interface{}) error { + switch vv := value.(type) { + case JSON: + o.valuesByPath = vv.valuesByPath + case *JSON: + o.valuesByPath = vv.valuesByPath + case map[string]any: + o.valuesByPath = vv + default: + return fmt.Errorf("JSON Scan value must be clickhouse.JSON or map[string]any") + } + + return nil +} + +// Value implements the driver.Valuer interface +func (o *JSON) Value() (driver.Value, error) { + return o, nil +} diff --git a/lib/chcol/variant.go b/lib/chcol/variant.go index 5a5adee473..ac87396929 100644 --- a/lib/chcol/variant.go +++ b/lib/chcol/variant.go @@ -72,11 +72,6 @@ func (v Variant) Any() any { return v.value } -// MarshalJSON implements the json.Marshaler interface -func (v *Variant) MarshalJSON() ([]byte, error) { - return json.Marshal(v.value) -} - // Scan implements the sql.Scanner interface func (v *Variant) Scan(value interface{}) error { switch vv := value.(type) { @@ -97,3 +92,55 @@ func (v *Variant) Scan(value interface{}) error { func (v Variant) Value() (driver.Value, error) { return v, nil } + +// MarshalJSON implements the json.Marshaler interface +func (v Variant) MarshalJSON() ([]byte, error) { + if v.Nil() { + return []byte("null"), nil + } + + return json.Marshal(v.value) +} + +// UnmarshalJSON implements the json.Unmarshaler interface +func (v *Variant) UnmarshalJSON(data []byte) error { + if string(data) == "null" { + v.value = nil + return nil + } + + if err := json.Unmarshal(data, &v.value); err != nil { + return err + } + + return nil +} + +// MarshalText implements the encoding.TextMarshaler interface +func (v Variant) MarshalText() ([]byte, error) { + if v.Nil() { + return []byte(""), nil + } + + switch vv := v.value.(type) { + case string: + return []byte(vv), nil + case []byte: + return vv, nil + case json.RawMessage: + return vv, nil + } + + return json.Marshal(v.value) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface +func (v *Variant) UnmarshalText(text []byte) error { + if len(text) == 0 { + v.value = nil + return nil + } + + v.value = string(text) + return nil +} diff --git a/lib/column/codegen/column.tpl b/lib/column/codegen/column.tpl index 78eed58c56..a35275c4f2 100644 --- a/lib/column/codegen/column.tpl +++ b/lib/column/codegen/column.tpl @@ -135,8 +135,10 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return (&Tuple{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Variant("): return (&Variant{name: name}).parse(t, tz) - case strings.HasPrefix(string(t), "Dynamic"): - return (&Dynamic{name: name}).parse(t, tz) + case strings.HasPrefix(string(t), "Dynamic"): + return (&Dynamic{name: name}).parse(t, tz) + case strings.HasPrefix(string(t), "JSON"): + return (&JSON{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Decimal("): return (&Decimal{name: name}).parse(t) case strings.HasPrefix(strType, "Nested("): @@ -199,7 +201,8 @@ var ( scanTypeDecimal = reflect.TypeOf(decimal.Decimal{}) scanTypeMultiPolygon = reflect.TypeOf(orb.MultiPolygon{}) scanTypeVariant = reflect.TypeOf(chcol.Variant{}) - scanTypeDynamic = reflect.TypeOf(chcol.Dynamic{}) + scanTypeDynamic = reflect.TypeOf(chcol.Dynamic{}) + scanTypeJSON = reflect.TypeOf(chcol.JSON{}) ) {{- range . }} diff --git a/lib/column/column_gen.go b/lib/column/column_gen.go index cc601212f5..439f71a60c 100644 --- a/lib/column/column_gen.go +++ b/lib/column/column_gen.go @@ -153,6 +153,8 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return (&Variant{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Dynamic"): return (&Dynamic{name: name}).parse(t, tz) + case strings.HasPrefix(string(t), "JSON"): + return (&JSON{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Decimal("): return (&Decimal{name: name}).parse(t) case strings.HasPrefix(strType, "Nested("): @@ -264,6 +266,7 @@ var ( scanTypeMultiPolygon = reflect.TypeOf(orb.MultiPolygon{}) scanTypeVariant = reflect.TypeOf(chcol.Variant{}) scanTypeDynamic = reflect.TypeOf(chcol.Dynamic{}) + scanTypeJSON = reflect.TypeOf(chcol.JSON{}) ) func (col *Float32) Name() string { diff --git a/lib/column/json.go b/lib/column/json.go index d00d5f5ca2..0798cfaa40 100644 --- a/lib/column/json.go +++ b/lib/column/json.go @@ -19,935 +19,711 @@ package column import ( "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" + "math" "reflect" + "strconv" "strings" "time" "github.com/ClickHouse/ch-go/proto" ) -// This JSON type implementation was done for an experimental Object('JSON') type: -// https://clickhouse.com/docs/en/sql-reference/data-types/object-data-type -// It's already deprecated in ClickHouse and will be removed in the future. -// Since ClickHouse 24.8, the Object('JSON') type is no longer alias for JSON type. -// The new JSON type has been introduced: https://clickhouse.com/docs/en/sql-reference/data-types/newjson -// However, the new JSON type is not supported by the driver yet. -// -// This implementation is kept for backward compatibility and will be removed in the future. TODO: remove this - -// inverse mapping - go types to clickhouse types -var kindMappings = map[reflect.Kind]string{ - reflect.String: "String", - reflect.Int: "Int64", - reflect.Int8: "Int8", - reflect.Int16: "Int16", - reflect.Int32: "Int32", - reflect.Int64: "Int64", - reflect.Uint: "UInt64", - reflect.Uint8: "UInt8", - reflect.Uint16: "UInt16", - reflect.Uint32: "UInt32", - reflect.Uint64: "UInt64", - reflect.Float32: "Float32", - reflect.Float64: "Float64", - reflect.Bool: "Bool", -} - -// complex types for which a mapping exists - currently we map to String but could enhance in the future for other types -var typeMappings = map[string]struct{}{ - // currently JSON doesn't support DateTime, Decimal or IP so mapped to String - "time.Time": {}, - "decimal.Decimal": {}, - "net.IP": {}, - "uuid.UUID": {}, -} - -type JSON interface { - Interface - appendEmptyValue() error -} - -type JSONParent interface { - upsertValue(name string, ct string) (*JSONValue, error) - upsertList(name string) (*JSONList, error) - upsertObject(name string) (*JSONObject, error) - insertEmptyColumn(name string) error - columnNames() []string - rows() int -} - -func parseType(name string, vType reflect.Type, values any, isArray bool, jCol JSONParent, numEmpty int) error { - _, ok := typeMappings[vType.String()] - if !ok { - return &UnsupportedColumnTypeError{ - t: Type(vType.String()), - } - } - ct := "String" - if isArray { - ct = fmt.Sprintf("Array(%s)", ct) - } - col, err := jCol.upsertValue(name, ct) - if err != nil { - return err - } - col.origType = vType +const JSONObjectSerializationVersion uint64 = 0 +const JSONStringSerializationVersion uint64 = 1 +const JSONUnsetSerializationVersion uint64 = math.MaxUint64 +const DefaultMaxDynamicPaths = 1024 - //pre pad with empty - e.g. for new values in maps - for i := 0; i < numEmpty; i++ { - if isArray { - // empty array for nil of the right type - err = col.AppendRow([]string{}) - } else { - // empty value of the type - err = col.AppendRow(fmt.Sprint(reflect.New(vType).Elem().Interface())) - } - if err != nil { - return err - } - } - if isArray { - iValues := reflect.ValueOf(values) - sValues := make([]string, iValues.Len(), iValues.Len()) - for i := 0; i < iValues.Len(); i++ { - sValues[i] = fmt.Sprint(iValues.Index(i).Interface()) - } - return col.AppendRow(sValues) - } - return col.AppendRow(fmt.Sprint(values)) +type JSON struct { + chType Type + tz *time.Location + name string + rows int + + serializationVersion uint64 + + jsonStrings String + + typedPaths []string + typedPathsIndex map[string]int + typedColumns []Interface + + skipPaths []string + skipPathsIndex map[string]int + + dynamicPaths []string + dynamicPathsIndex map[string]int + dynamicColumns []*Dynamic + + maxDynamicPaths int + maxDynamicTypes int + totalDynamicPaths int } -func parsePrimitive(name string, kind reflect.Kind, values any, isArray bool, jCol JSONParent, numEmpty int) error { - ct, ok := kindMappings[kind] - if !ok { - return &UnsupportedColumnTypeError{ - t: Type(fmt.Sprintf("%s - %s", kind, reflect.TypeOf(values).String())), - } - } - var err error - if isArray { - ct = fmt.Sprintf("Array(%s)", ct) - // if we have a []any we will need to cast to the target column type - this will be based on the first - // values types. Inconsistent slices will fail. - values, err = convertSlice(values) - if err != nil { - return err - } +func (c *JSON) parse(t Type, tz *time.Location) (_ *JSON, err error) { + c.chType = t + c.tz = tz + tStr := string(t) + + c.serializationVersion = JSONUnsetSerializationVersion + c.typedPathsIndex = make(map[string]int) + c.skipPathsIndex = make(map[string]int) + c.dynamicPathsIndex = make(map[string]int) + c.maxDynamicPaths = DefaultMaxDynamicPaths + c.maxDynamicTypes = DefaultMaxDynamicTypes + + if tStr == "JSON" { + return c, nil } - col, err := jCol.upsertValue(name, ct) - if err != nil { - return err + + if !strings.HasPrefix(tStr, "JSON(") || !strings.HasSuffix(tStr, ")") { + return nil, &UnsupportedColumnTypeError{t: t} } - //pre pad with empty - e.g. for new values in maps - for i := 0; i < numEmpty; i++ { - if isArray { - // empty array for nil of the right type - err = col.AppendRow(reflect.MakeSlice(reflect.TypeOf(values), 0, 0).Interface()) - } else { - err = col.AppendRow(nil) + typePartsStr := strings.TrimPrefix(tStr, "JSON(") + typePartsStr = strings.TrimSuffix(typePartsStr, ")") + + typeParts := splitWithDelimiters(typePartsStr) + for _, typePart := range typeParts { + typePart = strings.TrimSpace(typePart) + + if strings.HasPrefix(typePart, "max_dynamic_paths=") { + v := strings.TrimPrefix(typePart, "max_dynamic_paths=") + if maxPaths, err := strconv.Atoi(v); err == nil { + c.maxDynamicPaths = maxPaths + } + + continue } - if err != nil { - return err + + if strings.HasPrefix(typePart, "max_dynamic_types=") { + v := strings.TrimPrefix(typePart, "max_dynamic_types=") + if maxTypes, err := strconv.Atoi(v); err == nil { + c.maxDynamicTypes = maxTypes + } + + continue } - } - return col.AppendRow(values) -} + if strings.HasPrefix(typePart, "SKIP REGEXP") { + pattern := strings.TrimPrefix(typePart, "SKIP REGEXP") + pattern = strings.Trim(pattern, " '") + c.skipPaths = append(c.skipPaths, pattern) + c.skipPathsIndex[pattern] = len(c.skipPaths) - 1 -// converts a []any of primitives to a typed slice -// maybe this can be done with reflection but likely slower. investigate. -// this uses the first value to determine the type - subsequent values must currently be of the same type - we might cast later -// but wider driver doesn't support e.g. int to int64 -func convertSlice(values any) (any, error) { - rValues := reflect.ValueOf(values) - if rValues.Len() == 0 || rValues.Index(0).Kind() != reflect.Interface { - return values, nil - } - var fType reflect.Type - for i := 0; i < rValues.Len(); i++ { - elem := rValues.Index(i).Elem() - if elem.IsValid() { - fType = elem.Type() - break + continue } - } - if fType == nil { - return []any{}, nil - } - typedSlice := reflect.MakeSlice(reflect.SliceOf(fType), 0, rValues.Len()) - for i := 0; i < rValues.Len(); i++ { - value := rValues.Index(i) - if value.IsNil() { - typedSlice = reflect.Append(typedSlice, reflect.Zero(fType)) + + if strings.HasPrefix(typePart, "SKIP") { + path := strings.TrimPrefix(typePart, "SKIP") + path = strings.Trim(path, " `") + c.skipPaths = append(c.skipPaths, path) + c.skipPathsIndex[path] = len(c.skipPaths) - 1 + continue } - if rValues.Index(i).Elem().Type() != fType { - return nil, &Error{ - ColumnType: fmt.Sprint(fType), - Err: fmt.Errorf("inconsistent slices are not supported - expected %s got %s", fType, rValues.Index(i).Elem().Type()), - } + + typedPathParts := strings.SplitN(typePart, " ", 2) + if len(typedPathParts) != 2 { + continue } - typedSlice = reflect.Append(typedSlice, rValues.Index(i).Elem()) - } - return typedSlice.Interface(), nil -} -func (jCol *JSONList) createNewOffsets(num int) { - for i := 0; i < num; i++ { - //single depth so can take 1st - if jCol.offsets[0].values.col.Rows() == 0 { - // first entry in the column - jCol.offsets[0].values.col.Append(0) - } else { - // entry for this object to see offset from last - offsets are cumulative - jCol.offsets[0].values.col.Append(jCol.offsets[0].values.col.Row(jCol.offsets[0].values.col.Rows() - 1)) + typedPath := strings.Trim(typedPathParts[0], "`") + typeName := strings.TrimSpace(typedPathParts[1]) + + c.typedPaths = append(c.typedPaths, typedPath) + c.typedPathsIndex[typedPath] = len(c.typedPaths) - 1 + + col, err := Type(typeName).Column("", tz) + if err != nil { + return nil, fmt.Errorf("failed to init column of type \"%s\" at path \"%s\": %w", typeName, typedPath, err) } + + c.typedColumns = append(c.typedColumns, col) } + + return c, nil } -func getStructFieldName(field reflect.StructField) (string, bool) { - name := field.Name - tag := field.Tag.Get("json") - // not a standard but we allow - to omit fields - if tag == "-" { - return name, true - } - if tag != "" { - return tag, false - } - // support ch tag as well as this is used elsewhere - tag = field.Tag.Get("ch") - if tag == "-" { - return name, true - } - if tag != "" { - return tag, false - } - return name, false +func (c *JSON) hasTypedPath(path string) bool { + _, ok := c.typedPathsIndex[path] + return ok } -// ensures numeric keys and ` are escaped properly -func getMapFieldName(name string) string { - if !escapeColRegex.MatchString(name) { - return fmt.Sprintf("`%s`", colEscape.Replace(name)) - } - return colEscape.Replace(name) +func (c *JSON) hasDynamicPath(path string) bool { + _, ok := c.dynamicPathsIndex[path] + return ok } -func parseSlice(name string, values any, jCol JSONParent, preFill int) error { - fType := reflect.TypeOf(values).Elem() - sKind := fType.Kind() - rValues := reflect.ValueOf(values) +func (c *JSON) hasSkipPath(path string) bool { + _, ok := c.skipPathsIndex[path] + return ok +} - if sKind == reflect.Interface { - //use the first element to determine if it is a complex or primitive map - after this we need consistent dimensions - if rValues.Len() == 0 { - return nil - } - var value reflect.Value - for i := 0; i < rValues.Len(); i++ { - value = rValues.Index(i).Elem() - if value.IsValid() { - break - } +// pathHasNestedValues returns true if the provided path has child paths in typed or dynamic paths +func (c *JSON) pathHasNestedValues(path string) bool { + for _, typedPath := range c.typedPaths { + if strings.HasPrefix(typedPath, path+".") { + return true } - if !value.IsValid() { - return nil - } - fType = value.Type() - sKind = value.Kind() } - if _, ok := typeMappings[fType.String()]; ok { - return parseType(name, fType, values, true, jCol, preFill) - } else if sKind == reflect.Struct || sKind == reflect.Map || sKind == reflect.Slice { - if rValues.Len() == 0 { - return nil - } - col, err := jCol.upsertList(name) - if err != nil { - return err - } - col.createNewOffsets(preFill + 1) - for i := 0; i < rValues.Len(); i++ { - // increment offset - col.offsets[0].values.col[col.offsets[0].values.col.Rows()-1] += 1 - value := rValues.Index(i) - sKind = value.Kind() - if sKind == reflect.Interface { - sKind = value.Elem().Kind() - } - switch sKind { - case reflect.Struct: - col.isNested = true - if err = iterateStruct(value, col, 0); err != nil { - return err - } - case reflect.Map: - col.isNested = true - if err = iterateMap(value, col, 0); err != nil { - return err - } - case reflect.Slice: - if err = parseSlice("", value.Interface(), col, 0); err != nil { - return err - } - default: - // only happens if slice has a primitive mixed with complex types in a []any - return &Error{ - ColumnType: fmt.Sprint(sKind), - Err: fmt.Errorf("slices must be same dimension in column %s", col.Name()), - } - } + for _, dynamicPath := range c.dynamicPaths { + if strings.HasPrefix(dynamicPath, path+".") { + return true } - return nil } - return parsePrimitive(name, sKind, values, true, jCol, preFill) -} -func parseStruct(name string, structVal reflect.Value, jCol JSONParent, preFill int) error { - col, err := jCol.upsertObject(name) - if err != nil { - return err - } - return iterateStruct(structVal, col, preFill) + return false } -func iterateStruct(structVal reflect.Value, col JSONParent, preFill int) error { - // structs generally have consistent field counts but we ignore nil values that are any as we can't infer from - // these until they occur - so we might need to either backfill when to do occur or insert empty based on previous - if structVal.Kind() == reflect.Interface { - // can happen if passed from []any - structVal = structVal.Elem() +// valueAtPath returns the row value at the specified path, typed or dynamic +func (c *JSON) valueAtPath(path string, row int, ptr bool) any { + if colIndex, ok := c.typedPathsIndex[path]; ok { + return c.typedColumns[colIndex].Row(row, ptr) } - currentColumns := col.columnNames() - columnLookup := make(map[string]struct{}) - numRows := col.rows() - for _, name := range currentColumns { - columnLookup[name] = struct{}{} + if colIndex, ok := c.dynamicPathsIndex[path]; ok { + return c.dynamicColumns[colIndex].Row(row, ptr) } - addedColumns := make([]string, structVal.NumField(), structVal.NumField()) - newColumn := false - for i := 0; i < structVal.NumField(); i++ { - fName, omit := getStructFieldName(structVal.Type().Field(i)) - if omit { - continue - } - field := structVal.Field(i) - if !field.CanInterface() { - // can't interface - likely not exported so ignore the field - continue - } - kind := field.Kind() - value := field.Interface() - fType := field.Type() - //resolve underlying kind - if kind == reflect.Interface { - if value == nil { - // ignore nil fields - continue - } - kind = reflect.TypeOf(value).Kind() - field = reflect.ValueOf(value) - fType = field.Type() - } - if _, ok := columnLookup[fName]; !ok && len(currentColumns) > 0 { - // new column - need to handle missing - preFill = numRows - newColumn = true - } - if _, ok := typeMappings[fType.String()]; ok { - if err := parseType(fName, fType, value, false, col, preFill); err != nil { - return err - } - } else { - switch kind { - case reflect.Slice: - if reflect.ValueOf(value).Len() == 0 { - continue - } - if err := parseSlice(fName, value, col, preFill); err != nil { - return err - } - case reflect.Struct: - if err := parseStruct(fName, field, col, preFill); err != nil { - return err - } - case reflect.Map: - if err := parseMap(fName, field, col, preFill); err != nil { - return err - } - default: - if err := parsePrimitive(fName, kind, value, false, col, preFill); err != nil { - return err - } - } - } - addedColumns[i] = fName - if newColumn { - // reset as otherwise prefill overflow to other fields. But don't reset if this prefill has come from - // a higher level - preFill = 0 - } - } - // handle missing - missingColumns := difference(currentColumns, addedColumns) - for _, name := range missingColumns { - if err := col.insertEmptyColumn(name); err != nil { - return err - } - } return nil } -func parseMap(name string, mapVal reflect.Value, jCol JSONParent, preFill int) error { - if mapVal.Type().Key().Kind() != reflect.String { - return &Error{ - ColumnType: fmt.Sprint(mapVal.Type().Key().Kind()), - Err: fmt.Errorf("map keys must be string for column %s", name), - } +// scanTypedPathToValue scans the provided typed path into a `reflect.Value` +func (c *JSON) scanTypedPathToValue(path string, row int, value reflect.Value) error { + colIndex, ok := c.typedPathsIndex[path] + if !ok { + return fmt.Errorf("typed path \"%s\" does not exist in JSON column", path) } - col, err := jCol.upsertObject(name) + + col := c.typedColumns[colIndex] + err := col.ScanRow(value.Addr().Interface(), row) if err != nil { - return err - } - return iterateMap(mapVal, col, preFill) -} - -func iterateMap(mapVal reflect.Value, col JSONParent, preFill int) error { - // maps can have inconsistent numbers of elements - we must ensure they are consistent in the encoding - // two inconsistent options - 1. new - map has new columns 2. massing - map has missing columns - // for (1) we need to update previous, for (2) we need to ensure we add a null entry - if mapVal.Kind() == reflect.Interface { - // can happen if passed from []any - mapVal = mapVal.Elem() - } - - currentColumns := col.columnNames() - //gives us a fast lookup for large maps - columnLookup := make(map[string]struct{}) - numRows := col.rows() - // true if we need nil values - for _, name := range currentColumns { - columnLookup[name] = struct{}{} - } - addedColumns := make([]string, len(mapVal.MapKeys()), len(mapVal.MapKeys())) - newColumn := false - for i, key := range mapVal.MapKeys() { - if newColumn { - // reset as otherwise prefill overflow to other fields. But don't reset if this prefill has come from - // a higher level - preFill = 0 - } - - name := getMapFieldName(key.Interface().(string)) - if _, ok := columnLookup[name]; !ok && len(currentColumns) > 0 { - // new column - need to handle - preFill = numRows - newColumn = true - } - field := mapVal.MapIndex(key) - kind := field.Kind() - fType := field.Type() - - if kind == reflect.Interface { - if field.Interface() == nil { - // ignore nil fields - continue - } - kind = reflect.TypeOf(field.Interface()).Kind() - field = reflect.ValueOf(field.Interface()) - fType = field.Type() - } - if _, ok := typeMappings[fType.String()]; ok { - if err := parseType(name, fType, field.Interface(), false, col, preFill); err != nil { - return err - } - } else { - switch kind { - case reflect.Struct: - if err := parseStruct(name, field, col, preFill); err != nil { - return err - } - case reflect.Slice: - if err := parseSlice(name, field.Interface(), col, preFill); err != nil { - return err - } - case reflect.Map: - if err := parseMap(name, field, col, preFill); err != nil { - return err - } - default: - if err := parsePrimitive(name, kind, field.Interface(), false, col, preFill); err != nil { - return err - } - } - } - addedColumns[i] = name - } - // handle missing - missingColumns := difference(currentColumns, addedColumns) - for _, name := range missingColumns { - if err := col.insertEmptyColumn(name); err != nil { - return err - } + return fmt.Errorf("failed to scan %s column into typed path \"%s\": %w", col.Type(), path, err) } + return nil } -func appendStructOrMap(jCol *JSONObject, data any) error { - vData := reflect.ValueOf(data) - kind := vData.Kind() - if kind == reflect.Struct { - return iterateStruct(vData, jCol, 0) - } - if kind == reflect.Map { - if reflect.TypeOf(data).Key().Kind() != reflect.String { - return &Error{ - ColumnType: fmt.Sprint(reflect.TypeOf(data).Key().Kind()), - Err: fmt.Errorf("map keys must be string for column %s", jCol.Name()), - } - } - if jCol.columns == nil && vData.Len() == 0 { - // if map is empty, we need to create an empty Tuple to make sure subcolumns protocol is happy - // _dummy is a ClickHouse internal name for empty Tuple subcolumn - // it has the same effect as `INSERT INTO single_json_type_table VALUES ('{}');` - jCol.upsertValue("_dummy", "Int8") - return jCol.insertEmptyColumn("_dummy") - } - return iterateMap(vData, jCol, 0) +// scanDynamicPathToValue scans the provided typed path into a `reflect.Value` +func (c *JSON) scanDynamicPathToValue(path string, row int, value reflect.Value) error { + colIndex, ok := c.dynamicPathsIndex[path] + if !ok { + return fmt.Errorf("dynamic path \"%s\" does not exist in JSON column", path) } - return &UnsupportedColumnTypeError{ - t: Type(fmt.Sprint(kind)), + + col := c.dynamicColumns[colIndex] + err := col.ScanRow(value.Addr().Interface(), row) + if err != nil { + return fmt.Errorf("failed to scan %s column into dynamic path \"%s\": %w", col.Type(), path, err) } -} -type JSONValue struct { - Interface - // represents the type e.g. uuid - these may have been mapped to a Column type support by JSON e.g. String - origType reflect.Type + return nil } -func (jCol *JSONValue) Reset() { - jCol.Interface.Reset() -} +func (c *JSON) rowAsJSON(row int) *chcol.JSON { + obj := chcol.NewJSON() -func (jCol *JSONValue) appendEmptyValue() error { - switch jCol.Interface.(type) { - case *Array: - if jCol.Rows() > 0 { - return jCol.AppendRow(reflect.MakeSlice(reflect.TypeOf(jCol.Row(0, false)), 0, 0).Interface()) - } - return &Error{ - ColumnType: "unknown", - Err: fmt.Errorf("can't add empty value to column %s - no entries to infer type", jCol.Name()), - } - default: - // can't just append nil here as we need a custom nil value for the type - if jCol.origType != nil { - return jCol.AppendRow(fmt.Sprint(reflect.New(jCol.origType).Elem().Interface())) - } - return jCol.AppendRow(nil) + for i, path := range c.typedPaths { + col := c.typedColumns[i] + obj.SetValueAtPath(path, col.Row(row, false)) + } + + for i, path := range c.dynamicPaths { + col := c.dynamicColumns[i] + obj.SetValueAtPath(path, col.Row(row, false)) } -} -func (jCol *JSONValue) Type() Type { - return Type(fmt.Sprintf("%s %s", jCol.Name(), jCol.Interface.Type())) + return obj } -type JSONList struct { - Array - name string - isNested bool // indicates if this a list of objects i.e. a Nested +func (c *JSON) Name() string { + return c.name } -func (jCol *JSONList) Name() string { - return jCol.name +func (c *JSON) Type() Type { + return c.chType } -func (jCol *JSONList) columnNames() []string { - return jCol.Array.values.(*JSONObject).columnNames() +func (c *JSON) Rows() int { + return c.rows } -func (jCol *JSONList) rows() int { - return jCol.values.(*JSONObject).Rows() +func (c *JSON) Row(row int, ptr bool) any { + switch c.serializationVersion { + case JSONObjectSerializationVersion: + return c.rowAsJSON(row) + case JSONStringSerializationVersion: + return c.jsonStrings.Row(row, ptr) + default: + return nil + } } -func createJSONList(name string, tz *time.Location) (jCol *JSONList) { - // lists are represented as Nested which are in turn encoded as Array(Tuple()). We thus pass a Array(JSONObject()) - // as this encodes like a tuple - lCol := &JSONList{ - name: name, +func (c *JSON) ScanRow(dest any, row int) error { + switch c.serializationVersion { + case JSONObjectSerializationVersion: + return c.scanRowObject(dest, row) + case JSONStringSerializationVersion: + return c.scanRowString(dest, row) + default: + return fmt.Errorf("unsupported JSON serialization version for scan: %d", c.serializationVersion) } - lCol.values = &JSONObject{tz: tz} - // depth should always be one as nested arrays aren't possible - lCol.depth = 1 - lCol.scanType = scanTypeSlice - offsetScanTypes := []reflect.Type{lCol.scanType} - lCol.offsets = []*offset{{ - scanType: offsetScanTypes[0], - }} - return lCol } -func (jCol *JSONList) appendEmptyValue() error { - // only need to bump the offsets - jCol.createNewOffsets(1) - return nil +func (c *JSON) scanRowObject(dest any, row int) error { + switch v := dest.(type) { + case *chcol.JSON: + obj := c.rowAsJSON(row) + *v = *obj + return nil + case **chcol.JSON: + obj := c.rowAsJSON(row) + **v = *obj + return nil + } + + switch val := reflect.ValueOf(dest); val.Kind() { + case reflect.Pointer: + if val.Elem().Kind() == reflect.Struct { + return c.scanIntoStruct(dest, row) + } else if val.Elem().Kind() == reflect.Map { + return c.scanIntoMap(dest, row) + } + } + + return fmt.Errorf("destination must be a pointer to struct or map, or %s. hint: enable \"output_format_native_write_json_as_string\" setting for string decoding", scanTypeJSON.String()) } -func (jCol *JSONList) insertEmptyColumn(name string) error { - return jCol.values.(*JSONObject).insertEmptyColumn(name) +func (c *JSON) scanRowString(dest any, row int) error { + return c.jsonStrings.ScanRow(dest, row) } -func (jCol *JSONList) upsertValue(name string, ct string) (*JSONValue, error) { - // check if column exists and reuse if same type, error if same name and different type - jObj := jCol.values.(*JSONObject) - cols := jObj.columns - for i := range cols { - sCol := cols[i] - if sCol.Name() == name { - vCol, ok := cols[i].(*JSONValue) - if !ok { - sType := cols[i].Type() - return nil, &Error{ - ColumnType: fmt.Sprint(sType), - Err: fmt.Errorf("type mismatch in column %s - expected value, got %s", name, sType), - } - } - tType := vCol.Interface.Type() - if tType != Type(ct) { - return nil, &Error{ - ColumnType: ct, - Err: fmt.Errorf("type mismatch in column %s - expected %s, got %s", name, tType, ct), - } - } - return vCol, nil +func (c *JSON) Append(v any) (nulls []uint8, err error) { + switch c.serializationVersion { + case JSONObjectSerializationVersion: + return c.appendObject(v) + case JSONStringSerializationVersion: + return c.appendString(v) + default: + // Unset serialization preference, try string first unless its specifically JSON + switch v.(type) { + case []chcol.JSON: + c.serializationVersion = JSONObjectSerializationVersion + return c.appendObject(v) + case []*chcol.JSON: + c.serializationVersion = JSONObjectSerializationVersion + return c.appendObject(v) } - } - col, err := Type(ct).Column(name, jObj.tz) - if err != nil { - return nil, err - } - vCol := &JSONValue{ - Interface: col, - } - jCol.values.(*JSONObject).columns = append(cols, vCol) // nolint:gocritic - return vCol, nil -} - -func (jCol *JSONList) upsertList(name string) (*JSONList, error) { - // check if column exists and reuse if same type, error if same name and different type - jObj := jCol.values.(*JSONObject) - cols := jCol.values.(*JSONObject).columns - for i := range cols { - sCol := cols[i] - if sCol.Name() == name { - sCol, ok := cols[i].(*JSONList) - if !ok { - return nil, &Error{ - ColumnType: fmt.Sprint(cols[i].Type()), - Err: fmt.Errorf("type mismatch in column %s - expected list, got %s", name, cols[i].Type()), - } - } - return sCol, nil + + var err error + if _, err = c.appendString(v); err == nil { + c.serializationVersion = JSONStringSerializationVersion + return nil, nil + } else if _, err = c.appendObject(v); err == nil { + c.serializationVersion = JSONObjectSerializationVersion + return nil, nil } - } - lCol := createJSONList(name, jObj.tz) - jCol.values.(*JSONObject).columns = append(cols, lCol) // nolint:gocritic - return lCol, nil + return nil, fmt.Errorf("unsupported type \"%s\" for JSON column, must use slice of string, []byte, struct, map, or *%s: %w", reflect.TypeOf(v).String(), scanTypeJSON.String(), err) + } } -func (jCol *JSONList) upsertObject(name string) (*JSONObject, error) { - // check if column exists and reuse if same type, error if same name and different type - jObj := jCol.values.(*JSONObject) - cols := jObj.columns - for i := range cols { - sCol := cols[i] - if sCol.Name() == name { - sCol, ok := cols[i].(*JSONObject) - if !ok { - sType := cols[i].Type() - return nil, &Error{ - ColumnType: fmt.Sprint(sType), - Err: fmt.Errorf("type mismatch in column %s, expected object got %s", name, sType), - } +func (c *JSON) appendObject(v any) (nulls []uint8, err error) { + switch vv := v.(type) { + case []chcol.JSON: + for i, obj := range vv { + err := c.AppendRow(obj) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) } - return sCol, nil } + + return nil, nil + case []*chcol.JSON: + for i, obj := range vv { + err := c.AppendRow(obj) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) + } + } + + return nil, nil } - // lists are represented as Nested which are in turn encoded as Array(Tuple()). We thus pass a Array(JSONObject()) - // as this encodes like a tuple - oCol := &JSONObject{ - name: name, - tz: jObj.tz, - } - jCol.values.(*JSONObject).columns = append(cols, oCol) // nolint:gocritic - return oCol, nil -} -func (jCol *JSONList) Type() Type { - cols := jCol.values.(*JSONObject).columns - subTypes := make([]string, len(cols)) - for i, v := range cols { - subTypes[i] = string(v.Type()) + value := reflect.Indirect(reflect.ValueOf(v)) + if value.Kind() != reflect.Slice { + return nil, &ColumnConverterError{ + Op: "Append", + To: string(c.chType), + From: fmt.Sprintf("%T", v), + Hint: "value must be a slice", + } } - // can be a list of lists or a nested - if jCol.isNested { - return Type(fmt.Sprintf("%s Nested(%s)", jCol.name, strings.Join(subTypes, ", "))) + for i := 0; i < value.Len(); i++ { + if err := c.AppendRow(value.Index(i)); err != nil { + return nil, err + } } - return Type(fmt.Sprintf("%s Array(%s)", jCol.name, strings.Join(subTypes, ", "))) -} -type JSONObject struct { - columns []JSON - name string - root bool - encoding uint8 - tz *time.Location + return nil, nil } -func (jCol *JSONObject) Reset() { - for i := range jCol.columns { - jCol.columns[i].Reset() +func (c *JSON) appendString(v any) (nulls []uint8, err error) { + nulls, err = c.jsonStrings.Append(v) + if err != nil { + return nil, err } -} -func (jCol *JSONObject) Name() string { - return jCol.name + c.rows = c.jsonStrings.Rows() + return nulls, nil } -func (jCol *JSONObject) columnNames() []string { - columns := make([]string, len(jCol.columns), len(jCol.columns)) - for i := range jCol.columns { - columns[i] = jCol.columns[i].Name() +func (c *JSON) AppendRow(v any) error { + switch c.serializationVersion { + case JSONObjectSerializationVersion: + return c.appendRowObject(v) + case JSONStringSerializationVersion: + return c.appendRowString(v) + default: + // Unset serialization preference, try string first unless its specifically JSON + switch v.(type) { + case chcol.JSON: + c.serializationVersion = JSONObjectSerializationVersion + return c.appendRowObject(v) + case *chcol.JSON: + c.serializationVersion = JSONObjectSerializationVersion + return c.appendRowObject(v) + } + + var err error + if err = c.appendRowString(v); err == nil { + c.serializationVersion = JSONStringSerializationVersion + return nil + } else if err = c.appendRowObject(v); err == nil { + c.serializationVersion = JSONObjectSerializationVersion + return nil + } + + return fmt.Errorf("unsupported type \"%s\" for JSON column, must use string, []byte, struct, map, or *%s: %w", reflect.TypeOf(v).String(), scanTypeJSON.String(), err) } - return columns } -func (jCol *JSONObject) rows() int { - return jCol.Rows() -} +func (c *JSON) appendRowObject(v any) error { + var obj *chcol.JSON + switch vv := v.(type) { + case chcol.JSON: + obj = &vv + case *chcol.JSON: + obj = vv + } + + if obj == nil && v != nil { + var err error + switch val := reflect.ValueOf(v); val.Kind() { + case reflect.Pointer: + if val.Elem().Kind() == reflect.Struct { + obj, err = structToJSON(v) + } else if val.Elem().Kind() == reflect.Map { + obj, err = mapToJSON(v) + } + case reflect.Struct: + obj, err = structToJSON(v) + case reflect.Map: + obj, err = mapToJSON(v) + } -func (jCol *JSONObject) appendEmptyValue() error { - for i := range jCol.columns { - if err := jCol.columns[i].appendEmptyValue(); err != nil { - return err + if err != nil { + return fmt.Errorf("failed to convert value to JSON: %w", err) } } - return nil -} -func (jCol *JSONObject) insertEmptyColumn(name string) error { - for i := range jCol.columns { - if jCol.columns[i].Name() == name { - if err := jCol.columns[i].appendEmptyValue(); err != nil { - return err - } - return nil + if obj == nil { + obj = chcol.NewJSON() + } + valuesByPath := obj.ValuesByPath() + + // Match typed paths first + for i, typedPath := range c.typedPaths { + // Even if value is nil, we must append a value for this row. + // nil is a valid value for most column types, with most implementations putting a zero value. + // If the column doesn't support appending nil, then the user must provide a zero value. + value, _ := valuesByPath[typedPath] + + col := c.typedColumns[i] + err := col.AppendRow(value) + if err != nil { + return fmt.Errorf("failed to append type %s to json column at typed path %s: %w", col.Type(), typedPath, err) } } - return &Error{ - ColumnType: "unknown", - Err: fmt.Errorf("column %s is missing - empty value cannot be appended", name), + + // Verify all dynamic paths have an equal number of rows by appending nil for all unspecified dynamic paths + for _, dynamicPath := range c.dynamicPaths { + if _, ok := valuesByPath[dynamicPath]; !ok { + valuesByPath[dynamicPath] = nil + } } -} -func (jCol *JSONObject) upsertValue(name string, ct string) (*JSONValue, error) { - for i := range jCol.columns { - sCol := jCol.columns[i] - if sCol.Name() == name { - vCol, ok := jCol.columns[i].(*JSONValue) - if !ok { - sType := jCol.columns[i].Type() - return nil, &Error{ - ColumnType: fmt.Sprint(sType), - Err: fmt.Errorf("type mismatch in column %s, expected value got %s", name, sType), - } + // Match or add dynamic paths + for objPath, value := range valuesByPath { + if c.hasTypedPath(objPath) || c.hasSkipPath(objPath) { + continue + } + + if dynamicPathIndex, ok := c.dynamicPathsIndex[objPath]; ok { + err := c.dynamicColumns[dynamicPathIndex].AppendRow(value) + if err != nil { + return fmt.Errorf("failed to append to json column at dynamic path \"%s\": %w", objPath, err) } - if vCol.Interface.Type() != Type(ct) { - return nil, &Error{ - ColumnType: ct, - Err: fmt.Errorf("type mismatch in column %s, expected %s got %s", name, vCol.Interface.Type(), ct), + } else { + // Path doesn't exist, add new dynamic path + column + parsedColDynamic, _ := Type("Dynamic").Column("", c.tz) + colDynamic := parsedColDynamic.(*Dynamic) + + // New path must back-fill nils for each row + for i := 0; i < c.rows; i++ { + err := colDynamic.AppendRow(nil) + if err != nil { + return fmt.Errorf("failed to back-fill json column at new dynamic path \"%s\" index %d: %w", objPath, i, err) } } - return vCol, nil + + err := colDynamic.AppendRow(value) + if err != nil { + return fmt.Errorf("failed to append to json column at new dynamic path \"%s\": %w", objPath, err) + } + + c.dynamicPaths = append(c.dynamicPaths, objPath) + c.dynamicPathsIndex[objPath] = len(c.dynamicPaths) - 1 + c.dynamicColumns = append(c.dynamicColumns, colDynamic) + c.totalDynamicPaths++ } } - col, err := Type(ct).Column(name, jCol.tz) - if err != nil { - return nil, err - } - vCol := &JSONValue{ - Interface: col, - } - jCol.columns = append(jCol.columns, vCol) - return vCol, nil + + c.rows++ + return nil } -func (jCol *JSONObject) upsertList(name string) (*JSONList, error) { - for i := range jCol.columns { - sCol := jCol.columns[i] - if sCol.Name() == name { - sCol, ok := jCol.columns[i].(*JSONList) - if !ok { - sType := jCol.columns[i].Type() - return nil, &Error{ - ColumnType: fmt.Sprint(sType), - Err: fmt.Errorf("type mismatch in column %s, expected list got %s", name, sType), - } - } - return sCol, nil - } +func (c *JSON) appendRowString(v any) error { + err := c.jsonStrings.AppendRow(v) + if err != nil { + return err } - lCol := createJSONList(name, jCol.tz) - jCol.columns = append(jCol.columns, lCol) - return lCol, nil + + c.rows++ + return nil } -func (jCol *JSONObject) upsertObject(name string) (*JSONObject, error) { - // check if it exists - for i := range jCol.columns { - sCol := jCol.columns[i] - if sCol.Name() == name { - sCol, ok := jCol.columns[i].(*JSONObject) - if !ok { - sType := jCol.columns[i].Type() - return nil, &Error{ - ColumnType: fmt.Sprint(sType), - Err: fmt.Errorf("type mismatch in column %s, expected object got %s", name, sType), - } - } - return sCol, nil - } +func (c *JSON) encodeObjectHeader(buffer *proto.Buffer) { + buffer.PutUVarInt(uint64(c.maxDynamicPaths)) + buffer.PutUVarInt(uint64(c.totalDynamicPaths)) + + for _, dynamicPath := range c.dynamicPaths { + buffer.PutString(dynamicPath) } - // not present so create - oCol := &JSONObject{ - name: name, - tz: jCol.tz, + + for _, col := range c.dynamicColumns { + col.encodeHeader(buffer) } - jCol.columns = append(jCol.columns, oCol) - return oCol, nil } -func (jCol *JSONObject) Type() Type { - if jCol.root { - return "Object('json')" +func (c *JSON) encodeObjectData(buffer *proto.Buffer) { + for _, col := range c.typedColumns { + col.Encode(buffer) } - return jCol.FullType() -} -func (jCol *JSONObject) FullType() Type { - subTypes := make([]string, len(jCol.columns)) - for i, v := range jCol.columns { - subTypes[i] = string(v.Type()) + for _, col := range c.dynamicColumns { + col.encodeData(buffer) } - if jCol.root { - return Type(fmt.Sprintf("Tuple(%s)", strings.Join(subTypes, ", "))) + + // SharedData per row, empty for now. + for i := 0; i < c.rows; i++ { + buffer.PutUInt64(0) } - return Type(fmt.Sprintf("%s Tuple(%s)", jCol.name, strings.Join(subTypes, ", "))) } -func (jCol *JSONObject) ScanType() reflect.Type { - return scanTypeMap +func (c *JSON) encodeStringData(buffer *proto.Buffer) { + c.jsonStrings.Encode(buffer) } -func (jCol *JSONObject) Rows() int { - if len(jCol.columns) != 0 { - return jCol.columns[0].Rows() +func (c *JSON) Encode(buffer *proto.Buffer) { + switch c.serializationVersion { + case JSONObjectSerializationVersion: + buffer.PutUInt64(JSONObjectSerializationVersion) + c.encodeObjectHeader(buffer) + c.encodeObjectData(buffer) + return + case JSONStringSerializationVersion: + buffer.PutUInt64(JSONStringSerializationVersion) + c.encodeStringData(buffer) + return } - return 0 } -// ClickHouse returns JSON as a tuple i.e. these will never be invoked - -func (jCol *JSONObject) Row(i int, ptr bool) any { - panic("Not implemented") +func (c *JSON) ScanType() reflect.Type { + return scanTypeJSON } -func (jCol *JSONObject) ScanRow(dest any, row int) error { - panic("Not implemented") -} +func (c *JSON) Reset() { + c.rows = 0 -func (jCol *JSONObject) Append(v any) (nulls []uint8, err error) { - jSlice := reflect.ValueOf(v) - if jSlice.Kind() != reflect.Slice { - return nil, &ColumnConverterError{ - Op: "Append", - To: string(jCol.Type()), - From: fmt.Sprintf("slice of structs/map or strings required - received %T", v), + switch c.serializationVersion { + case JSONObjectSerializationVersion: + for _, col := range c.typedColumns { + col.Reset() } - } - for i := 0; i < jSlice.Len(); i++ { - if err := jCol.AppendRow(jSlice.Index(i).Interface()); err != nil { - return nil, err + + for _, col := range c.dynamicColumns { + col.Reset() } + + return + case JSONStringSerializationVersion: + c.jsonStrings.Reset() + return } - return nil, nil } -func (jCol *JSONObject) AppendRow(v any) error { - if reflect.ValueOf(v).Kind() == reflect.Struct || reflect.ValueOf(v).Kind() == reflect.Map { - if jCol.columns != nil && jCol.encoding == 1 { - return &Error{ - ColumnType: fmt.Sprint(jCol.Type()), - Err: fmt.Errorf("encoding of JSON columns cannot be mixed in a batch - %s cannot be added as previously String", reflect.ValueOf(v).Kind()), - } +func (c *JSON) decodeObjectHeader(reader *proto.Reader) error { + maxDynamicPaths, err := reader.UVarInt() + if err != nil { + return fmt.Errorf("failed to read max dynamic paths for json column: %w", err) + } + c.maxDynamicPaths = int(maxDynamicPaths) + + totalDynamicPaths, err := reader.UVarInt() + if err != nil { + return fmt.Errorf("failed to read total dynamic paths for json column: %w", err) + } + c.totalDynamicPaths = int(totalDynamicPaths) + + c.dynamicPaths = make([]string, 0, totalDynamicPaths) + for i := 0; i < int(totalDynamicPaths); i++ { + dynamicPath, err := reader.Str() + if err != nil { + return fmt.Errorf("failed to read dynamic path name bytes at index %d for json column: %w", i, err) } - err := appendStructOrMap(jCol, v) - return err + + c.dynamicPaths = append(c.dynamicPaths, dynamicPath) + c.dynamicPathsIndex[dynamicPath] = len(c.dynamicPaths) - 1 } - switch v := v.(type) { - case string: - if jCol.columns != nil && jCol.encoding == 0 { - return &Error{ - ColumnType: fmt.Sprint(jCol.Type()), - Err: fmt.Errorf("encoding of JSON columns cannot be mixed in a batch - %s cannot be added as previously Struct/Map", reflect.ValueOf(v).Kind()), - } + + c.dynamicColumns = make([]*Dynamic, 0, totalDynamicPaths) + for _, dynamicPath := range c.dynamicPaths { + parsedColDynamic, _ := Type("Dynamic").Column("", c.tz) + colDynamic := parsedColDynamic.(*Dynamic) + + err := colDynamic.decodeHeader(reader) + if err != nil { + return fmt.Errorf("failed to decode dynamic header at path %s for json column: %w", dynamicPath, err) } - jCol.encoding = 1 - if jCol.columns == nil { - jCol.columns = append(jCol.columns, &JSONValue{Interface: &String{}}) + + c.dynamicColumns = append(c.dynamicColumns, colDynamic) + } + + return nil +} + +func (c *JSON) decodeObjectData(reader *proto.Reader, rows int) error { + for i, col := range c.typedColumns { + typedPath := c.typedPaths[i] + + err := col.Decode(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode %s typed path %s for json column: %w", col.Type(), typedPath, err) } - jCol.columns[0].AppendRow(v) - default: - return &ColumnConverterError{ - Op: "AppendRow", - To: "String", - From: fmt.Sprintf("json row must be struct, map or string - received %T", v), + } + + for i, col := range c.dynamicColumns { + dynamicPath := c.dynamicPaths[i] + + err := col.decodeData(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode dynamic path %s for json column: %w", dynamicPath, err) } } + + // SharedData per row, ignored for now. May cause stream offset issues if present + _, err := reader.ReadRaw(8 * rows) // one UInt64 per row + if err != nil { + return fmt.Errorf("failed to read shared data for json column: %w", err) + } + return nil } -func (jCol *JSONObject) Decode(reader *proto.Reader, rows int) error { - panic("Not implemented") +func (c *JSON) decodeStringData(reader *proto.Reader, rows int) error { + return c.jsonStrings.Decode(reader, rows) } -func (jCol *JSONObject) Encode(buffer *proto.Buffer) { - if jCol.root && jCol.encoding == 0 { - buffer.PutString(string(jCol.FullType())) +func (c *JSON) Decode(reader *proto.Reader, rows int) error { + c.rows = rows + + jsonSerializationVersion, err := reader.UInt64() + if err != nil { + return fmt.Errorf("failed to read json serialization version: %w", err) } - for _, c := range jCol.columns { - c.Encode(buffer) + + c.serializationVersion = jsonSerializationVersion + + switch jsonSerializationVersion { + case JSONObjectSerializationVersion: + err := c.decodeObjectHeader(reader) + if err != nil { + return fmt.Errorf("failed to decode json object header: %w", err) + } + + err = c.decodeObjectData(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode json object data: %w", err) + } + + return nil + case JSONStringSerializationVersion: + err = c.decodeStringData(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode json string data: %w", err) + } + return nil + default: + return fmt.Errorf("unsupported JSON serialization version for decode: %d", jsonSerializationVersion) + } +} + +// splitWithDelimiters splits the string while considering backticks and parentheses +func splitWithDelimiters(s string) []string { + var parts []string + var currentPart strings.Builder + var brackets int + inBackticks := false + + for i := 0; i < len(s); i++ { + switch s[i] { + case '`': + inBackticks = !inBackticks + currentPart.WriteByte(s[i]) + case '(': + brackets++ + currentPart.WriteByte(s[i]) + case ')': + brackets-- + currentPart.WriteByte(s[i]) + case ',': + if !inBackticks && brackets == 0 { + parts = append(parts, currentPart.String()) + currentPart.Reset() + } else { + currentPart.WriteByte(s[i]) + } + default: + currentPart.WriteByte(s[i]) + } } -} -func (jCol *JSONObject) ReadStatePrefix(reader *proto.Reader) error { - _, err := reader.UInt8() - return err -} + if currentPart.Len() > 0 { + parts = append(parts, currentPart.String()) + } -func (jCol *JSONObject) WriteStatePrefix(buffer *proto.Buffer) error { - buffer.PutUInt8(jCol.encoding) - return nil + return parts } - -var ( - _ Interface = (*JSONObject)(nil) - _ CustomSerialization = (*JSONObject)(nil) -) diff --git a/lib/column/json_reflect.go b/lib/column/json_reflect.go new file mode 100644 index 0000000000..b38b1f6769 --- /dev/null +++ b/lib/column/json_reflect.go @@ -0,0 +1,450 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package column + +import ( + "fmt" + "reflect" + "strings" + + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" +) + +// Decoding (Scanning) + +// scanIntoStruct will iterate the provided struct and scan JSON data into the matching fields +func (c *JSON) scanIntoStruct(dest any, row int) error { + val := reflect.ValueOf(dest) + if val.Kind() != reflect.Pointer { + return fmt.Errorf("destination must be a pointer") + } + val = val.Elem() + + if val.Kind() != reflect.Struct { + return fmt.Errorf("destination must be a pointer to struct") + } + + return c.fillStruct(val, "", row) +} + +// scanIntoMap converts JSON data into a map +func (c *JSON) scanIntoMap(dest any, row int) error { + val := reflect.ValueOf(dest) + if val.Kind() != reflect.Pointer { + return fmt.Errorf("destination must be a pointer") + } + val = val.Elem() + + if val.Kind() != reflect.Map { + return fmt.Errorf("destination must be a pointer to map") + } + + if val.Type().Key().Kind() != reflect.String { + return fmt.Errorf("map key must be string") + } + + if val.IsNil() { + val.Set(reflect.MakeMap(val.Type())) + } + + return c.fillMap(val, "", row) +} + +// fillStruct will iterate the provided struct and scan JSON data into the matching fields recursively +func (c *JSON) fillStruct(val reflect.Value, prefix string, row int) error { + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + if !field.CanSet() { + continue + } + + name := fieldType.Tag.Get("json") + if name == "" { + name = fieldType.Name + } else { + name = strings.Split(name, ",")[0] + } + + if name == "-" { + continue + } + + path := name + if prefix != "" { + path = prefix + "." + name + } + + if c.hasTypedPath(path) { + err := c.scanTypedPathToValue(path, row, field) + if err != nil { + return fmt.Errorf("fillStruct failed to scan typed path: %w", err) + } + + continue + } else if c.hasDynamicPath(path) { + err := c.scanDynamicPathToValue(path, row, field) + if err != nil { + return fmt.Errorf("fillStruct failed to scan dynamic path: %w", err) + } + + continue + } + + hasNestedFields := c.pathHasNestedValues(path) + if !hasNestedFields { + continue + } + + switch field.Kind() { + case reflect.Pointer: + if field.Type().Elem().Kind() == reflect.Struct { + if field.IsNil() { + field.Set(reflect.New(field.Type().Elem())) + } + + if err := c.fillStruct(field.Elem(), path, row); err != nil { + return fmt.Errorf("error filling nested struct pointer: %w", err) + } + } + case reflect.Struct: + if err := c.fillStruct(field, path, row); err != nil { + return fmt.Errorf("error filling nested struct: %w", err) + } + case reflect.Map: + if err := c.fillMap(field, path, row); err != nil { + return fmt.Errorf("error filling nested map: %w", err) + } + } + } + + return nil +} + +// fillMap will iterate the provided map and scan JSON data in recursively +func (c *JSON) fillMap(val reflect.Value, prefix string, row int) error { + if val.IsNil() { + val.Set(reflect.MakeMap(val.Type())) + } + + var paths []string + for _, path := range c.typedPaths { + if strings.HasPrefix(path, prefix) { + paths = append(paths, path) + } + } + for _, path := range c.dynamicPaths { + if strings.HasPrefix(path, prefix) { + paths = append(paths, path) + } + } + + children := make(map[string][]string) + prefixLen := len(prefix) + if prefixLen > 0 { + prefixLen++ // splitter + } + + for _, path := range paths { + if prefixLen >= len(path) { + continue + } + + suffix := path[prefixLen:] + nextDot := strings.Index(suffix, ".") + var current string + if nextDot == -1 { + current = suffix + } else { + current = suffix[:nextDot] + } + children[current] = append(children[current], path) + } + + for key, childPaths := range children { + noChildNodes := true + for _, path := range childPaths { + if strings.Contains(path[prefixLen:], ".") { + noChildNodes = false + break + } + } + + if noChildNodes { + fullPath := prefix + if prefix != "" { + fullPath += "." + } + fullPath += key + + mapValueType := val.Type().Elem() + newVal := reflect.New(mapValueType).Elem() + + var err error + if _, isTyped := c.typedPathsIndex[fullPath]; isTyped { + err = c.scanTypedPathToValue(fullPath, row, newVal) + } else { + if mapValueType.Kind() == reflect.Interface { + value := c.valueAtPath(fullPath, row, false) + if dyn, ok := value.(chcol.Dynamic); ok { + value = dyn.Any() + } + + if value != nil { + newVal.Set(reflect.ValueOf(value)) + } + } else { + err = c.scanDynamicPathToValue(fullPath, row, newVal) + } + } + if err != nil { + return fmt.Errorf("failed to scan value at path \"%s\": %w", fullPath, err) + } + + val.SetMapIndex(reflect.ValueOf(key), newVal) + } else { + newPrefix := prefix + if newPrefix != "" { + newPrefix += "." + } + newPrefix += key + + mapValueType := val.Type().Elem() + var newMap reflect.Value + + if mapValueType.Kind() == reflect.Interface { + newMap = reflect.MakeMap(reflect.TypeOf(map[string]interface{}{})) + } else if mapValueType.Kind() == reflect.Map { + newMap = reflect.MakeMap(mapValueType) + } else { + return fmt.Errorf("invalid map value type for nested path \"%s\"", newPrefix) + } + + err := c.fillMap(newMap, newPrefix, row) + if err != nil { + return fmt.Errorf("failed filling nested map at path \"%s\": %w", newPrefix, err) + } + + val.SetMapIndex(reflect.ValueOf(key), newMap) + } + } + + return nil +} + +// Encoding (Append, AppendRow) + +// structToJSON converts a struct to JSON data +func structToJSON(v any) (*chcol.JSON, error) { + json := chcol.NewJSON() + val := reflect.ValueOf(v) + + if val.Kind() == reflect.Pointer { + val = val.Elem() + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("expected struct, got %v", val.Kind()) + } + + err := iterateStruct(val, "", json) + if err != nil { + return nil, err + } + + return json, nil +} + +// mapToJSON converts a map to JSON data +func mapToJSON(v any) (*chcol.JSON, error) { + json := chcol.NewJSON() + val := reflect.ValueOf(v) + + if val.Kind() == reflect.Pointer { + val = val.Elem() + } + + if val.Kind() != reflect.Map { + return nil, fmt.Errorf("expected map, got %v", val.Kind()) + } + + if val.Type().Key().Kind() != reflect.String { + return nil, fmt.Errorf("map key must be string, got %v", val.Type().Key().Kind()) + } + + err := iterateMap(val, "", json) + if err != nil { + return nil, err + } + + return json, nil +} + +// iterateStruct recursively iterates through a struct and adds its fields to the JSON data +func iterateStruct(val reflect.Value, prefix string, json *chcol.JSON) error { + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + if !field.CanInterface() { + continue + } + + name := fieldType.Tag.Get("json") + if name == "" { + name = fieldType.Name + } else { + // handle `json:"name,omitempty"` + name = strings.Split(name, ",")[0] + } + + if name == "-" { + continue + } + + path := name + if prefix != "" { + path = prefix + "." + name + } + + forcedType := fieldType.Tag.Get("chType") + err := handleValue(field, path, json, forcedType) + if err != nil { + return err + } + } + + return nil +} + +// iterateStructSkipTypes is a set of struct types that will not be iterated. +// Instead, the value will be assigned directly for use within Dynamic row appending. +var iterateStructSkipTypes = map[reflect.Type]struct{}{ + scanTypeIP: {}, + scanTypeUUID: {}, + scanTypeTime: {}, + scanTypeTime: {}, + scanTypeRing: {}, + scanTypePoint: {}, + scanTypeBigInt: {}, + scanTypePolygon: {}, + scanTypeDecimal: {}, + scanTypeMultiPolygon: {}, + scanTypeVariant: {}, + scanTypeDynamic: {}, + scanTypeJSON: {}, +} + +// handleValue processes a single value and adds it to the JSON data +func handleValue(val reflect.Value, path string, json *chcol.JSON, forcedType string) error { + if val.Kind() == reflect.Interface { + val = val.Elem() + } + + if !val.IsValid() { + json.SetValueAtPath(path, nil) + return nil + } + + switch val.Kind() { + case reflect.Pointer: + if val.IsNil() { + json.SetValueAtPath(path, nil) + return nil + } + return handleValue(val.Elem(), path, json, forcedType) + + case reflect.Struct: + if _, ok := iterateStructSkipTypes[val.Type()]; ok { + json.SetValueAtPath(path, val.Interface()) + return nil + } + + return iterateStruct(val, path, json) + + case reflect.Map: + if forcedType == "" && val.Type().Elem().Kind() == reflect.Interface { + // Only iterate maps if they are map[string]interface{} + return iterateMap(val, path, json) + } else if forcedType == "" { + json.SetValueAtPath(path, val.Interface()) + return nil + } else { + json.SetValueAtPath(path, chcol.NewDynamicWithType(val.Interface(), forcedType)) + return nil + } + case reflect.Slice, reflect.Array: + if forcedType == "" { + json.SetValueAtPath(path, val.Interface()) + } else { + json.SetValueAtPath(path, chcol.NewDynamicWithType(val.Interface(), forcedType)) + } + return nil + default: + if forcedType == "" { + json.SetValueAtPath(path, val.Interface()) + } else { + json.SetValueAtPath(path, chcol.NewDynamicWithType(val.Interface(), forcedType)) + } + return nil + } +} + +const MaxMapPathDepth = 32 + +// iterateMap recursively iterates through a map and adds its values to the JSON data +func iterateMap(val reflect.Value, prefix string, json *chcol.JSON) error { + depth := len(strings.Split(prefix, ".")) + if depth > MaxMapPathDepth { + return fmt.Errorf("maximum nesting depth exceeded") + } + + for _, key := range val.MapKeys() { + if key.Kind() != reflect.String { + return fmt.Errorf("map key must be string, got %v", key.Kind()) + } + + path := key.String() + if prefix != "" { + path = prefix + "." + path + } + + mapValue := val.MapIndex(key) + + if mapValue.Kind() == reflect.Interface { + mapValue = mapValue.Elem() + } + + if mapValue.Kind() == reflect.Map { + if err := iterateMap(mapValue, path, json); err != nil { + return err + } + } else { + if err := handleValue(mapValue, path, json, ""); err != nil { + return err + } + } + } + + return nil +} diff --git a/lib/column/object_json.go b/lib/column/object_json.go new file mode 100644 index 0000000000..2806fea5b7 --- /dev/null +++ b/lib/column/object_json.go @@ -0,0 +1,953 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package column + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/ClickHouse/ch-go/proto" +) + +// This JSON type implementation was done for an experimental Object('JSON') type: +// https://clickhouse.com/docs/en/sql-reference/data-types/object-data-type +// It's already deprecated in ClickHouse and will be removed in the future. +// Since ClickHouse 24.8, the Object('JSON') type is no longer alias for JSON type. +// The new JSON type has been introduced: https://clickhouse.com/docs/en/sql-reference/data-types/newjson +// However, the new JSON type is not supported by the driver yet. +// +// This implementation is kept for backward compatibility and will be removed in the future. TODO: remove this + +// inverse mapping - go types to clickhouse types +var kindMappings = map[reflect.Kind]string{ + reflect.String: "String", + reflect.Int: "Int64", + reflect.Int8: "Int8", + reflect.Int16: "Int16", + reflect.Int32: "Int32", + reflect.Int64: "Int64", + reflect.Uint: "UInt64", + reflect.Uint8: "UInt8", + reflect.Uint16: "UInt16", + reflect.Uint32: "UInt32", + reflect.Uint64: "UInt64", + reflect.Float32: "Float32", + reflect.Float64: "Float64", + reflect.Bool: "Bool", +} + +// complex types for which a mapping exists - currently we map to String but could enhance in the future for other types +var typeMappings = map[string]struct{}{ + // currently JSON doesn't support DateTime, Decimal or IP so mapped to String + "time.Time": {}, + "decimal.Decimal": {}, + "net.IP": {}, + "uuid.UUID": {}, +} + +type ObjectJSON interface { + Interface + appendEmptyValue() error +} + +type JSONParent interface { + upsertValue(name string, ct string) (*JSONValue, error) + upsertList(name string) (*JSONList, error) + upsertObject(name string) (*JSONObject, error) + insertEmptyColumn(name string) error + columnNames() []string + rows() int +} + +func parseType(name string, vType reflect.Type, values any, isArray bool, jCol JSONParent, numEmpty int) error { + _, ok := typeMappings[vType.String()] + if !ok { + return &UnsupportedColumnTypeError{ + t: Type(vType.String()), + } + } + ct := "String" + if isArray { + ct = fmt.Sprintf("Array(%s)", ct) + } + col, err := jCol.upsertValue(name, ct) + if err != nil { + return err + } + col.origType = vType + + //pre pad with empty - e.g. for new values in maps + for i := 0; i < numEmpty; i++ { + if isArray { + // empty array for nil of the right type + err = col.AppendRow([]string{}) + } else { + // empty value of the type + err = col.AppendRow(fmt.Sprint(reflect.New(vType).Elem().Interface())) + } + if err != nil { + return err + } + } + if isArray { + iValues := reflect.ValueOf(values) + sValues := make([]string, iValues.Len(), iValues.Len()) + for i := 0; i < iValues.Len(); i++ { + sValues[i] = fmt.Sprint(iValues.Index(i).Interface()) + } + return col.AppendRow(sValues) + } + return col.AppendRow(fmt.Sprint(values)) +} + +func parsePrimitive(name string, kind reflect.Kind, values any, isArray bool, jCol JSONParent, numEmpty int) error { + ct, ok := kindMappings[kind] + if !ok { + return &UnsupportedColumnTypeError{ + t: Type(fmt.Sprintf("%s - %s", kind, reflect.TypeOf(values).String())), + } + } + var err error + if isArray { + ct = fmt.Sprintf("Array(%s)", ct) + // if we have a []any we will need to cast to the target column type - this will be based on the first + // values types. Inconsistent slices will fail. + values, err = convertSlice(values) + if err != nil { + return err + } + } + col, err := jCol.upsertValue(name, ct) + if err != nil { + return err + } + + //pre pad with empty - e.g. for new values in maps + for i := 0; i < numEmpty; i++ { + if isArray { + // empty array for nil of the right type + err = col.AppendRow(reflect.MakeSlice(reflect.TypeOf(values), 0, 0).Interface()) + } else { + err = col.AppendRow(nil) + } + if err != nil { + return err + } + } + + return col.AppendRow(values) +} + +// converts a []any of primitives to a typed slice +// maybe this can be done with reflection but likely slower. investigate. +// this uses the first value to determine the type - subsequent values must currently be of the same type - we might cast later +// but wider driver doesn't support e.g. int to int64 +func convertSlice(values any) (any, error) { + rValues := reflect.ValueOf(values) + if rValues.Len() == 0 || rValues.Index(0).Kind() != reflect.Interface { + return values, nil + } + var fType reflect.Type + for i := 0; i < rValues.Len(); i++ { + elem := rValues.Index(i).Elem() + if elem.IsValid() { + fType = elem.Type() + break + } + } + if fType == nil { + return []any{}, nil + } + typedSlice := reflect.MakeSlice(reflect.SliceOf(fType), 0, rValues.Len()) + for i := 0; i < rValues.Len(); i++ { + value := rValues.Index(i) + if value.IsNil() { + typedSlice = reflect.Append(typedSlice, reflect.Zero(fType)) + continue + } + if rValues.Index(i).Elem().Type() != fType { + return nil, &Error{ + ColumnType: fmt.Sprint(fType), + Err: fmt.Errorf("inconsistent slices are not supported - expected %s got %s", fType, rValues.Index(i).Elem().Type()), + } + } + typedSlice = reflect.Append(typedSlice, rValues.Index(i).Elem()) + } + return typedSlice.Interface(), nil +} + +func (jCol *JSONList) createNewOffsets(num int) { + for i := 0; i < num; i++ { + //single depth so can take 1st + if jCol.offsets[0].values.col.Rows() == 0 { + // first entry in the column + jCol.offsets[0].values.col.Append(0) + } else { + // entry for this object to see offset from last - offsets are cumulative + jCol.offsets[0].values.col.Append(jCol.offsets[0].values.col.Row(jCol.offsets[0].values.col.Rows() - 1)) + } + } +} + +func getStructFieldName(field reflect.StructField) (string, bool) { + name := field.Name + tag := field.Tag.Get("json") + // not a standard but we allow - to omit fields + if tag == "-" { + return name, true + } + if tag != "" { + return tag, false + } + // support ch tag as well as this is used elsewhere + tag = field.Tag.Get("ch") + if tag == "-" { + return name, true + } + if tag != "" { + return tag, false + } + return name, false +} + +// ensures numeric keys and ` are escaped properly +func getMapFieldName(name string) string { + if !escapeColRegex.MatchString(name) { + return fmt.Sprintf("`%s`", colEscape.Replace(name)) + } + return colEscape.Replace(name) +} + +func parseSlice(name string, values any, jCol JSONParent, preFill int) error { + fType := reflect.TypeOf(values).Elem() + sKind := fType.Kind() + rValues := reflect.ValueOf(values) + + if sKind == reflect.Interface { + //use the first element to determine if it is a complex or primitive map - after this we need consistent dimensions + if rValues.Len() == 0 { + return nil + } + var value reflect.Value + for i := 0; i < rValues.Len(); i++ { + value = rValues.Index(i).Elem() + if value.IsValid() { + break + } + } + if !value.IsValid() { + return nil + } + fType = value.Type() + sKind = value.Kind() + } + + if _, ok := typeMappings[fType.String()]; ok { + return parseType(name, fType, values, true, jCol, preFill) + } else if sKind == reflect.Struct || sKind == reflect.Map || sKind == reflect.Slice { + if rValues.Len() == 0 { + return nil + } + col, err := jCol.upsertList(name) + if err != nil { + return err + } + col.createNewOffsets(preFill + 1) + for i := 0; i < rValues.Len(); i++ { + // increment offset + col.offsets[0].values.col[col.offsets[0].values.col.Rows()-1] += 1 + value := rValues.Index(i) + sKind = value.Kind() + if sKind == reflect.Interface { + sKind = value.Elem().Kind() + } + switch sKind { + case reflect.Struct: + col.isNested = true + if err = oldIterateStruct(value, col, 0); err != nil { + return err + } + case reflect.Map: + col.isNested = true + if err = oldIterateMap(value, col, 0); err != nil { + return err + } + case reflect.Slice: + if err = parseSlice("", value.Interface(), col, 0); err != nil { + return err + } + default: + // only happens if slice has a primitive mixed with complex types in a []any + return &Error{ + ColumnType: fmt.Sprint(sKind), + Err: fmt.Errorf("slices must be same dimension in column %s", col.Name()), + } + } + } + return nil + } + return parsePrimitive(name, sKind, values, true, jCol, preFill) +} + +func parseStruct(name string, structVal reflect.Value, jCol JSONParent, preFill int) error { + col, err := jCol.upsertObject(name) + if err != nil { + return err + } + return oldIterateStruct(structVal, col, preFill) +} + +func oldIterateStruct(structVal reflect.Value, col JSONParent, preFill int) error { + // structs generally have consistent field counts but we ignore nil values that are any as we can't infer from + // these until they occur - so we might need to either backfill when to do occur or insert empty based on previous + if structVal.Kind() == reflect.Interface { + // can happen if passed from []any + structVal = structVal.Elem() + } + + currentColumns := col.columnNames() + columnLookup := make(map[string]struct{}) + numRows := col.rows() + for _, name := range currentColumns { + columnLookup[name] = struct{}{} + } + addedColumns := make([]string, structVal.NumField(), structVal.NumField()) + newColumn := false + + for i := 0; i < structVal.NumField(); i++ { + fName, omit := getStructFieldName(structVal.Type().Field(i)) + if omit { + continue + } + field := structVal.Field(i) + if !field.CanInterface() { + // can't interface - likely not exported so ignore the field + continue + } + kind := field.Kind() + value := field.Interface() + fType := field.Type() + //resolve underlying kind + if kind == reflect.Interface { + if value == nil { + // ignore nil fields + continue + } + kind = reflect.TypeOf(value).Kind() + field = reflect.ValueOf(value) + fType = field.Type() + } + if _, ok := columnLookup[fName]; !ok && len(currentColumns) > 0 { + // new column - need to handle missing + preFill = numRows + newColumn = true + } + if _, ok := typeMappings[fType.String()]; ok { + if err := parseType(fName, fType, value, false, col, preFill); err != nil { + return err + } + } else { + switch kind { + case reflect.Slice: + if reflect.ValueOf(value).Len() == 0 { + continue + } + if err := parseSlice(fName, value, col, preFill); err != nil { + return err + } + case reflect.Struct: + if err := parseStruct(fName, field, col, preFill); err != nil { + return err + } + case reflect.Map: + if err := parseMap(fName, field, col, preFill); err != nil { + return err + } + default: + if err := parsePrimitive(fName, kind, value, false, col, preFill); err != nil { + return err + } + } + } + addedColumns[i] = fName + if newColumn { + // reset as otherwise prefill overflow to other fields. But don't reset if this prefill has come from + // a higher level + preFill = 0 + } + } + // handle missing + missingColumns := difference(currentColumns, addedColumns) + for _, name := range missingColumns { + if err := col.insertEmptyColumn(name); err != nil { + return err + } + } + return nil +} + +func parseMap(name string, mapVal reflect.Value, jCol JSONParent, preFill int) error { + if mapVal.Type().Key().Kind() != reflect.String { + return &Error{ + ColumnType: fmt.Sprint(mapVal.Type().Key().Kind()), + Err: fmt.Errorf("map keys must be string for column %s", name), + } + } + col, err := jCol.upsertObject(name) + if err != nil { + return err + } + return oldIterateMap(mapVal, col, preFill) +} + +func oldIterateMap(mapVal reflect.Value, col JSONParent, preFill int) error { + // maps can have inconsistent numbers of elements - we must ensure they are consistent in the encoding + // two inconsistent options - 1. new - map has new columns 2. massing - map has missing columns + // for (1) we need to update previous, for (2) we need to ensure we add a null entry + if mapVal.Kind() == reflect.Interface { + // can happen if passed from []any + mapVal = mapVal.Elem() + } + + currentColumns := col.columnNames() + //gives us a fast lookup for large maps + columnLookup := make(map[string]struct{}) + numRows := col.rows() + // true if we need nil values + for _, name := range currentColumns { + columnLookup[name] = struct{}{} + } + addedColumns := make([]string, len(mapVal.MapKeys()), len(mapVal.MapKeys())) + newColumn := false + for i, key := range mapVal.MapKeys() { + if newColumn { + // reset as otherwise prefill overflow to other fields. But don't reset if this prefill has come from + // a higher level + preFill = 0 + } + + name := getMapFieldName(key.Interface().(string)) + if _, ok := columnLookup[name]; !ok && len(currentColumns) > 0 { + // new column - need to handle + preFill = numRows + newColumn = true + } + field := mapVal.MapIndex(key) + kind := field.Kind() + fType := field.Type() + + if kind == reflect.Interface { + if field.Interface() == nil { + // ignore nil fields + continue + } + kind = reflect.TypeOf(field.Interface()).Kind() + field = reflect.ValueOf(field.Interface()) + fType = field.Type() + } + if _, ok := typeMappings[fType.String()]; ok { + if err := parseType(name, fType, field.Interface(), false, col, preFill); err != nil { + return err + } + } else { + switch kind { + case reflect.Struct: + if err := parseStruct(name, field, col, preFill); err != nil { + return err + } + case reflect.Slice: + if err := parseSlice(name, field.Interface(), col, preFill); err != nil { + return err + } + case reflect.Map: + if err := parseMap(name, field, col, preFill); err != nil { + return err + } + default: + if err := parsePrimitive(name, kind, field.Interface(), false, col, preFill); err != nil { + return err + } + } + } + addedColumns[i] = name + } + // handle missing + missingColumns := difference(currentColumns, addedColumns) + for _, name := range missingColumns { + if err := col.insertEmptyColumn(name); err != nil { + return err + } + } + return nil +} + +func appendStructOrMap(jCol *JSONObject, data any) error { + vData := reflect.ValueOf(data) + kind := vData.Kind() + if kind == reflect.Struct { + return oldIterateStruct(vData, jCol, 0) + } + if kind == reflect.Map { + if reflect.TypeOf(data).Key().Kind() != reflect.String { + return &Error{ + ColumnType: fmt.Sprint(reflect.TypeOf(data).Key().Kind()), + Err: fmt.Errorf("map keys must be string for column %s", jCol.Name()), + } + } + if jCol.columns == nil && vData.Len() == 0 { + // if map is empty, we need to create an empty Tuple to make sure subcolumns protocol is happy + // _dummy is a ClickHouse internal name for empty Tuple subcolumn + // it has the same effect as `INSERT INTO single_json_type_table VALUES ('{}');` + jCol.upsertValue("_dummy", "Int8") + return jCol.insertEmptyColumn("_dummy") + } + return oldIterateMap(vData, jCol, 0) + } + return &UnsupportedColumnTypeError{ + t: Type(fmt.Sprint(kind)), + } +} + +type JSONValue struct { + Interface + // represents the type e.g. uuid - these may have been mapped to a Column type support by JSON e.g. String + origType reflect.Type +} + +func (jCol *JSONValue) Reset() { + jCol.Interface.Reset() +} + +func (jCol *JSONValue) appendEmptyValue() error { + switch jCol.Interface.(type) { + case *Array: + if jCol.Rows() > 0 { + return jCol.AppendRow(reflect.MakeSlice(reflect.TypeOf(jCol.Row(0, false)), 0, 0).Interface()) + } + return &Error{ + ColumnType: "unknown", + Err: fmt.Errorf("can't add empty value to column %s - no entries to infer type", jCol.Name()), + } + default: + // can't just append nil here as we need a custom nil value for the type + if jCol.origType != nil { + return jCol.AppendRow(fmt.Sprint(reflect.New(jCol.origType).Elem().Interface())) + } + return jCol.AppendRow(nil) + } +} + +func (jCol *JSONValue) Type() Type { + return Type(fmt.Sprintf("%s %s", jCol.Name(), jCol.Interface.Type())) +} + +type JSONList struct { + Array + name string + isNested bool // indicates if this a list of objects i.e. a Nested +} + +func (jCol *JSONList) Name() string { + return jCol.name +} + +func (jCol *JSONList) columnNames() []string { + return jCol.Array.values.(*JSONObject).columnNames() +} + +func (jCol *JSONList) rows() int { + return jCol.values.(*JSONObject).Rows() +} + +func createJSONList(name string, tz *time.Location) (jCol *JSONList) { + // lists are represented as Nested which are in turn encoded as Array(Tuple()). We thus pass a Array(JSONObject()) + // as this encodes like a tuple + lCol := &JSONList{ + name: name, + } + lCol.values = &JSONObject{tz: tz} + // depth should always be one as nested arrays aren't possible + lCol.depth = 1 + lCol.scanType = scanTypeSlice + offsetScanTypes := []reflect.Type{lCol.scanType} + lCol.offsets = []*offset{{ + scanType: offsetScanTypes[0], + }} + return lCol +} + +func (jCol *JSONList) appendEmptyValue() error { + // only need to bump the offsets + jCol.createNewOffsets(1) + return nil +} + +func (jCol *JSONList) insertEmptyColumn(name string) error { + return jCol.values.(*JSONObject).insertEmptyColumn(name) +} + +func (jCol *JSONList) upsertValue(name string, ct string) (*JSONValue, error) { + // check if column exists and reuse if same type, error if same name and different type + jObj := jCol.values.(*JSONObject) + cols := jObj.columns + for i := range cols { + sCol := cols[i] + if sCol.Name() == name { + vCol, ok := cols[i].(*JSONValue) + if !ok { + sType := cols[i].Type() + return nil, &Error{ + ColumnType: fmt.Sprint(sType), + Err: fmt.Errorf("type mismatch in column %s - expected value, got %s", name, sType), + } + } + tType := vCol.Interface.Type() + if tType != Type(ct) { + return nil, &Error{ + ColumnType: ct, + Err: fmt.Errorf("type mismatch in column %s - expected %s, got %s", name, tType, ct), + } + } + return vCol, nil + } + } + col, err := Type(ct).Column(name, jObj.tz) + if err != nil { + return nil, err + } + vCol := &JSONValue{ + Interface: col, + } + jCol.values.(*JSONObject).columns = append(cols, vCol) // nolint:gocritic + return vCol, nil +} + +func (jCol *JSONList) upsertList(name string) (*JSONList, error) { + // check if column exists and reuse if same type, error if same name and different type + jObj := jCol.values.(*JSONObject) + cols := jCol.values.(*JSONObject).columns + for i := range cols { + sCol := cols[i] + if sCol.Name() == name { + sCol, ok := cols[i].(*JSONList) + if !ok { + return nil, &Error{ + ColumnType: fmt.Sprint(cols[i].Type()), + Err: fmt.Errorf("type mismatch in column %s - expected list, got %s", name, cols[i].Type()), + } + } + return sCol, nil + } + } + lCol := createJSONList(name, jObj.tz) + jCol.values.(*JSONObject).columns = append(cols, lCol) // nolint:gocritic + return lCol, nil + +} + +func (jCol *JSONList) upsertObject(name string) (*JSONObject, error) { + // check if column exists and reuse if same type, error if same name and different type + jObj := jCol.values.(*JSONObject) + cols := jObj.columns + for i := range cols { + sCol := cols[i] + if sCol.Name() == name { + sCol, ok := cols[i].(*JSONObject) + if !ok { + sType := cols[i].Type() + return nil, &Error{ + ColumnType: fmt.Sprint(sType), + Err: fmt.Errorf("type mismatch in column %s, expected object got %s", name, sType), + } + } + return sCol, nil + } + } + // lists are represented as Nested which are in turn encoded as Array(Tuple()). We thus pass a Array(JSONObject()) + // as this encodes like a tuple + oCol := &JSONObject{ + name: name, + tz: jObj.tz, + } + jCol.values.(*JSONObject).columns = append(cols, oCol) // nolint:gocritic + return oCol, nil +} + +func (jCol *JSONList) Type() Type { + cols := jCol.values.(*JSONObject).columns + subTypes := make([]string, len(cols)) + for i, v := range cols { + subTypes[i] = string(v.Type()) + } + // can be a list of lists or a nested + if jCol.isNested { + return Type(fmt.Sprintf("%s Nested(%s)", jCol.name, strings.Join(subTypes, ", "))) + } + return Type(fmt.Sprintf("%s Array(%s)", jCol.name, strings.Join(subTypes, ", "))) +} + +type JSONObject struct { + columns []ObjectJSON + name string + root bool + encoding uint8 + tz *time.Location +} + +func (jCol *JSONObject) Reset() { + for i := range jCol.columns { + jCol.columns[i].Reset() + } +} + +func (jCol *JSONObject) Name() string { + return jCol.name +} + +func (jCol *JSONObject) columnNames() []string { + columns := make([]string, len(jCol.columns), len(jCol.columns)) + for i := range jCol.columns { + columns[i] = jCol.columns[i].Name() + } + return columns +} + +func (jCol *JSONObject) rows() int { + return jCol.Rows() +} + +func (jCol *JSONObject) appendEmptyValue() error { + for i := range jCol.columns { + if err := jCol.columns[i].appendEmptyValue(); err != nil { + return err + } + } + return nil +} + +func (jCol *JSONObject) insertEmptyColumn(name string) error { + for i := range jCol.columns { + if jCol.columns[i].Name() == name { + if err := jCol.columns[i].appendEmptyValue(); err != nil { + return err + } + return nil + } + } + return &Error{ + ColumnType: "unknown", + Err: fmt.Errorf("column %s is missing - empty value cannot be appended", name), + } +} + +func (jCol *JSONObject) upsertValue(name string, ct string) (*JSONValue, error) { + for i := range jCol.columns { + sCol := jCol.columns[i] + if sCol.Name() == name { + vCol, ok := jCol.columns[i].(*JSONValue) + if !ok { + sType := jCol.columns[i].Type() + return nil, &Error{ + ColumnType: fmt.Sprint(sType), + Err: fmt.Errorf("type mismatch in column %s, expected value got %s", name, sType), + } + } + if vCol.Interface.Type() != Type(ct) { + return nil, &Error{ + ColumnType: ct, + Err: fmt.Errorf("type mismatch in column %s, expected %s got %s", name, vCol.Interface.Type(), ct), + } + } + return vCol, nil + } + } + col, err := Type(ct).Column(name, jCol.tz) + if err != nil { + return nil, err + } + vCol := &JSONValue{ + Interface: col, + } + jCol.columns = append(jCol.columns, vCol) + return vCol, nil +} + +func (jCol *JSONObject) upsertList(name string) (*JSONList, error) { + for i := range jCol.columns { + sCol := jCol.columns[i] + if sCol.Name() == name { + sCol, ok := jCol.columns[i].(*JSONList) + if !ok { + sType := jCol.columns[i].Type() + return nil, &Error{ + ColumnType: fmt.Sprint(sType), + Err: fmt.Errorf("type mismatch in column %s, expected list got %s", name, sType), + } + } + return sCol, nil + } + } + lCol := createJSONList(name, jCol.tz) + jCol.columns = append(jCol.columns, lCol) + return lCol, nil +} + +func (jCol *JSONObject) upsertObject(name string) (*JSONObject, error) { + // check if it exists + for i := range jCol.columns { + sCol := jCol.columns[i] + if sCol.Name() == name { + sCol, ok := jCol.columns[i].(*JSONObject) + if !ok { + sType := jCol.columns[i].Type() + return nil, &Error{ + ColumnType: fmt.Sprint(sType), + Err: fmt.Errorf("type mismatch in column %s, expected object got %s", name, sType), + } + } + return sCol, nil + } + } + // not present so create + oCol := &JSONObject{ + name: name, + tz: jCol.tz, + } + jCol.columns = append(jCol.columns, oCol) + return oCol, nil +} + +func (jCol *JSONObject) Type() Type { + if jCol.root { + return "Object('json')" + } + return jCol.FullType() +} + +func (jCol *JSONObject) FullType() Type { + subTypes := make([]string, len(jCol.columns)) + for i, v := range jCol.columns { + subTypes[i] = string(v.Type()) + } + if jCol.root { + return Type(fmt.Sprintf("Tuple(%s)", strings.Join(subTypes, ", "))) + } + return Type(fmt.Sprintf("%s Tuple(%s)", jCol.name, strings.Join(subTypes, ", "))) +} + +func (jCol *JSONObject) ScanType() reflect.Type { + return scanTypeMap +} + +func (jCol *JSONObject) Rows() int { + if len(jCol.columns) != 0 { + return jCol.columns[0].Rows() + } + return 0 +} + +// ClickHouse returns JSON as a tuple i.e. these will never be invoked + +func (jCol *JSONObject) Row(i int, ptr bool) any { + panic("Not implemented") +} + +func (jCol *JSONObject) ScanRow(dest any, row int) error { + panic("Not implemented") +} + +func (jCol *JSONObject) Append(v any) (nulls []uint8, err error) { + jSlice := reflect.ValueOf(v) + if jSlice.Kind() != reflect.Slice { + return nil, &ColumnConverterError{ + Op: "Append", + To: string(jCol.Type()), + From: fmt.Sprintf("slice of structs/map or strings required - received %T", v), + } + } + for i := 0; i < jSlice.Len(); i++ { + if err := jCol.AppendRow(jSlice.Index(i).Interface()); err != nil { + return nil, err + } + } + return nil, nil +} + +func (jCol *JSONObject) AppendRow(v any) error { + if reflect.ValueOf(v).Kind() == reflect.Struct || reflect.ValueOf(v).Kind() == reflect.Map { + if jCol.columns != nil && jCol.encoding == 1 { + return &Error{ + ColumnType: fmt.Sprint(jCol.Type()), + Err: fmt.Errorf("encoding of JSON columns cannot be mixed in a batch - %s cannot be added as previously String", reflect.ValueOf(v).Kind()), + } + } + err := appendStructOrMap(jCol, v) + return err + } + switch v := v.(type) { + case string: + if jCol.columns != nil && jCol.encoding == 0 { + return &Error{ + ColumnType: fmt.Sprint(jCol.Type()), + Err: fmt.Errorf("encoding of JSON columns cannot be mixed in a batch - %s cannot be added as previously Struct/Map", reflect.ValueOf(v).Kind()), + } + } + jCol.encoding = 1 + if jCol.columns == nil { + jCol.columns = append(jCol.columns, &JSONValue{Interface: &String{}}) + } + jCol.columns[0].AppendRow(v) + default: + return &ColumnConverterError{ + Op: "AppendRow", + To: "String", + From: fmt.Sprintf("json row must be struct, map or string - received %T", v), + } + } + return nil +} + +func (jCol *JSONObject) Decode(reader *proto.Reader, rows int) error { + panic("Not implemented") +} + +func (jCol *JSONObject) Encode(buffer *proto.Buffer) { + if jCol.root && jCol.encoding == 0 { + buffer.PutString(string(jCol.FullType())) + } + for _, c := range jCol.columns { + c.Encode(buffer) + } +} + +func (jCol *JSONObject) ReadStatePrefix(reader *proto.Reader) error { + _, err := reader.UInt8() + return err +} + +func (jCol *JSONObject) WriteStatePrefix(buffer *proto.Buffer) error { + buffer.PutUInt8(jCol.encoding) + return nil +} + +var ( + _ Interface = (*JSONObject)(nil) + _ CustomSerialization = (*JSONObject)(nil) +) diff --git a/lib/column/string.go b/lib/column/string.go index 5ce480b0e6..79b86e7005 100644 --- a/lib/column/string.go +++ b/lib/column/string.go @@ -21,6 +21,7 @@ import ( "database/sql" "database/sql/driver" "encoding" + "encoding/json" "fmt" "github.com/ClickHouse/ch-go/proto" "reflect" @@ -71,6 +72,11 @@ func (col *String) ScanRow(dest any, row int) error { **d = val case *sql.NullString: return d.Scan(val) + case *json.RawMessage: + *d = json.RawMessage(val) + case **json.RawMessage: + *d = new(json.RawMessage) + **d = json.RawMessage(val) case encoding.BinaryUnmarshaler: return d.UnmarshalBinary(binary.Str2Bytes(val, len(val))) default: @@ -111,6 +117,10 @@ func (col *String) AppendRow(v any) error { default: col.col.Append("") } + case json.RawMessage: + col.col.AppendBytes(v) + case *json.RawMessage: + col.col.AppendBytes(*v) case []byte: col.col.AppendBytes(v) case nil: @@ -171,6 +181,16 @@ func (col *String) Append(v any) (nulls []uint8, err error) { } col.AppendRow(v[i]) } + case []json.RawMessage: + nulls = make([]uint8, len(v)) + for i := range v { + col.col.Append(string(v[i])) + } + case []*json.RawMessage: + nulls = make([]uint8, len(v)) + for i := range v { + col.col.Append(string(*v[i])) + } case [][]byte: nulls = make([]uint8, len(v)) for i := range v { diff --git a/tests/json_test.go b/tests/json_test.go new file mode 100644 index 0000000000..77ba0ffbe0 --- /dev/null +++ b/tests/json_test.go @@ -0,0 +1,384 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/stretchr/testify/require" +) + +var jsonTestDate, _ = time.Parse(time.RFC3339, "2024-12-13T02:09:30.123Z") + +func setupJSONTest(t *testing.T) driver.Conn { + conn, err := GetNativeConnection(clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_json_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + require.NoError(t, err) + + if !CheckMinServerServerVersion(conn, 24, 9, 0) { + t.Skip(fmt.Errorf("unsupported clickhouse version for JSON type")) + return nil + } + + return conn +} + +func TestJSONPaths(t *testing.T) { + ctx := context.Background() + conn := setupJSONTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + jsonRow := chcol.NewJSON() + jsonRow.SetValueAtPath("Name", "JSON") + jsonRow.SetValueAtPath("Age", int64(42)) + jsonRow.SetValueAtPath("Active", true) + jsonRow.SetValueAtPath("Score", 3.14) + jsonRow.SetValueAtPath("Tags", []string{"a", "b"}) + jsonRow.SetValueAtPath("Numbers", []int64{20, 40}) + jsonRow.SetValueAtPath("Address.Street", "Street") + jsonRow.SetValueAtPath("Address.City", "City") + jsonRow.SetValueAtPath("Address.Country", "Country") + jsonRow.SetValueAtPath("KeysNumbers", map[string]int64{"FieldA": 42, "FieldB": 32}) + jsonRow.SetValueAtPath("Metadata.FieldA", "a") + jsonRow.SetValueAtPath("Metadata.FieldB", "b") + jsonRow.SetValueAtPath("Metadata.FieldC.FieldD", "d") + jsonRow.SetValueAtPath("Timestamp", jsonTestDate) + jsonRow.SetValueAtPath("DynamicString", clickhouse.NewDynamic("str")) + jsonRow.SetValueAtPath("DynamicInt", clickhouse.NewDynamic(int64(48))) + jsonRow.SetValueAtPath("DynamicMap", clickhouse.NewDynamic(map[string]string{"a": "a", "b": "b"})) + + require.NoError(t, batch.Append(jsonRow)) + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row chcol.JSON + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + + expectedValuesByPath := jsonRow.ValuesByPath() + actualValuesByPath := row.ValuesByPath() + for path, expectedValue := range expectedValuesByPath { + actualValue, ok := actualValuesByPath[path] + if !ok { + t.Fatalf("result JSON is missing path: %s", path) + } + + // Allow Equal func to compare values without Dynamic wrapper + if v, ok := expectedValue.(clickhouse.Dynamic); ok { + expectedValue = v.Any() + } + + if v, ok := actualValue.(clickhouse.Dynamic); ok { + actualValue = v.Any() + } + + require.Equal(t, expectedValue, actualValue) + } +} + +type Address struct { + Street string `chType:"String"` + City string `chType:"String"` + Country string `chType:"String"` +} + +type TestStruct struct { + Name string + Age int64 + Active bool + Score float64 + + Tags []string + Numbers []int64 + + Address Address + + KeysNumbers map[string]int64 + Metadata map[string]interface{} + + Timestamp time.Time `chType:"DateTime64(3)"` + + DynamicString chcol.Dynamic + DynamicInt chcol.Dynamic + DynamicMap chcol.Dynamic +} + +func TestJSONStruct(t *testing.T) { + ctx := context.Background() + conn := setupJSONTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + inputRow := TestStruct{ + Name: "JSON", + Age: 42, + Active: true, + Score: 3.14, + Tags: []string{"a", "b"}, + Numbers: []int64{20, 40}, + Address: Address{ + Street: "Street", + City: "City", + Country: "Country", + }, + KeysNumbers: map[string]int64{"FieldA": 42, "FieldB": 32}, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": "d", + }, + }, + Timestamp: jsonTestDate, + DynamicString: chcol.NewDynamic("str").WithType("String"), + DynamicInt: chcol.NewDynamic(int64(48)).WithType("Int64"), + DynamicMap: chcol.NewDynamic(map[string]string{"a": "a", "b": "b"}).WithType("Map(String, String)"), + } + require.NoError(t, batch.Append(inputRow)) + + inputRow2 := TestStruct{ + KeysNumbers: map[string]int64{}, + Timestamp: jsonTestDate, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": int64(5), + }, + "FieldE": map[string]interface{}{ + "FieldF": "f", + }, + }, + } + require.NoError(t, batch.Append(inputRow2)) + + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row TestStruct + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + // The second row adds a nil value at this path. Update the inputRow for easier deep equal check + inputRow.Metadata["FieldE"] = map[string]interface{}{ + "FieldF": nil, + } + require.Equal(t, inputRow, row) + + var row2 TestStruct + + require.True(t, rows.Next()) + err = rows.Scan(&row2) + require.NoError(t, err) + // Init slices for easier comparison + inputRow2.Tags = make([]string, 0) + inputRow2.Numbers = make([]int64, 0) + require.Equal(t, inputRow2, row2) +} + +func TestJSONString(t *testing.T) { + t.Skip("client cannot receive JSON strings") + + ctx := context.Background() + conn := setupJSONTest(t) + + require.NoError(t, conn.Exec(ctx, "SET output_format_native_write_json_as_string=1")) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + inputRow := TestStruct{ + Name: "JSON", + Age: 42, + Active: true, + Score: 3.14, + Tags: []string{"a", "b"}, + Numbers: []int64{20, 40}, + Address: Address{ + Street: "Street", + City: "City", + Country: "Country", + }, + KeysNumbers: map[string]int64{"FieldA": 42, "FieldB": 32}, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": "d", + }, + }, + Timestamp: jsonTestDate, + DynamicString: chcol.NewDynamic("str").WithType("String"), + DynamicInt: chcol.NewDynamic(int64(48)).WithType("Int64"), + DynamicMap: chcol.NewDynamic(map[string]string{"a": "a", "b": "b"}).WithType("Map(String, String)"), + } + + inputRowStr, err := json.Marshal(inputRow) + require.NoError(t, err) + require.NoError(t, batch.Append(inputRowStr)) + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row json.RawMessage + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + + require.Equal(t, string(inputRowStr), string(row)) + + var rowStruct TestStruct + err = json.Unmarshal(row, &rowStruct) + require.NoError(t, err) +} + +func TestJSON_BatchFlush(t *testing.T) { + t.Skip(fmt.Errorf("server-side JSON bug")) + + ctx := context.Background() + conn := setupJSONTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + vals := make([]*clickhouse.JSON, 0, 1000) + for i := 0; i < 1000; i++ { + row := clickhouse.NewJSON() + if i%2 == 0 { + row.SetValueAtPath("a", int64(i)) + row.SetValueAtPath("b", i%5 == 0) + } else { + row.SetValueAtPath("c", int64(-i)) + row.SetValueAtPath("d", i%5 != 0) + } + + vals = append(vals, row) + require.NoError(t, batch.Append(vals[i])) + require.NoError(t, batch.Flush()) + } + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + i := 0 + for rows.Next() { + var row clickhouse.JSON + err = rows.Scan(&row) + require.NoError(t, err) + + if i%2 == 0 { + valA, ok := row.ValueAtPath("a") + require.Equal(t, true, ok) + _, ok = valA.(clickhouse.Dynamic) + require.Equal(t, true, ok) + + require.Equal(t, int64(i), valA.(clickhouse.Dynamic).Any()) + require.Equal(t, "Int64", valA.(clickhouse.Dynamic).Type()) + + valB, ok := row.ValueAtPath("b") + require.Equal(t, true, ok) + _, ok = valB.(clickhouse.Dynamic) + require.Equal(t, true, ok) + + require.Equal(t, i%5 == 0, valB.(clickhouse.Dynamic).Any()) + require.Equal(t, "Bool", valB.(clickhouse.Dynamic).Type()) + } else { + valC, ok := row.ValueAtPath("c") + require.Equal(t, true, ok) + _, ok = valC.(clickhouse.Dynamic) + require.Equal(t, true, ok) + + require.Equal(t, int64(-i), valC.(clickhouse.Dynamic).Any()) + require.Equal(t, "Int64", valC.(clickhouse.Dynamic).Type()) + + valD, ok := row.ValueAtPath("d") + require.Equal(t, true, ok) + _, ok = valD.(clickhouse.Dynamic) + require.Equal(t, true, ok) + + require.Equal(t, i%5 != 0, valD.(clickhouse.Dynamic).Any()) + require.Equal(t, "Bool", valD.(clickhouse.Dynamic).Type()) + } + + i++ + } +} diff --git a/tests/std/json_test.go b/tests/std/json_test.go new file mode 100644 index 0000000000..b9f2b7ee08 --- /dev/null +++ b/tests/std/json_test.go @@ -0,0 +1,328 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package std + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var jsonTestDate, _ = time.Parse(time.RFC3339, "2024-12-13T02:09:30.123Z") + +func setupJSONTest(t *testing.T) *sql.DB { + conn, err := GetStdOpenDBConnection(clickhouse.Native, nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + require.NoError(t, err) + + if !CheckMinServerVersion(conn, 24, 9, 0) { + t.Skip(fmt.Errorf("unsupported clickhouse version for JSON type")) + return nil + } + + _, err = conn.ExecContext(context.Background(), "SET allow_experimental_json_type = 1") + if err != nil { + t.Fatal(err) + return nil + } + + return conn +} + +func TestJSONPaths(t *testing.T) { + ctx := context.Background() + conn := setupJSONTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + _, err := conn.ExecContext(ctx, ddl) + require.NoError(t, err) + defer func() { + _, err := conn.ExecContext(ctx, "DROP TABLE IF EXISTS test_json") + require.NoError(t, err) + }() + + tx, err := conn.BeginTx(ctx, nil) + require.NoError(t, err) + + batch, err := tx.PrepareContext(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + jsonRow := chcol.NewJSON() + jsonRow.SetValueAtPath("Name", "JSON") + jsonRow.SetValueAtPath("Age", int64(42)) + jsonRow.SetValueAtPath("Active", true) + jsonRow.SetValueAtPath("Score", 3.14) + jsonRow.SetValueAtPath("Tags", []string{"a", "b"}) + jsonRow.SetValueAtPath("Numbers", []int64{20, 40}) + jsonRow.SetValueAtPath("Address.Street", "Street") + jsonRow.SetValueAtPath("Address.City", "City") + jsonRow.SetValueAtPath("Address.Country", "Country") + jsonRow.SetValueAtPath("KeysNumbers", map[string]int64{"FieldA": 42, "FieldB": 32}) + jsonRow.SetValueAtPath("Metadata.FieldA", "a") + jsonRow.SetValueAtPath("Metadata.FieldB", "b") + jsonRow.SetValueAtPath("Metadata.FieldC.FieldD", "d") + jsonRow.SetValueAtPath("Timestamp", jsonTestDate) + jsonRow.SetValueAtPath("DynamicString", clickhouse.NewDynamic("str")) + jsonRow.SetValueAtPath("DynamicInt", clickhouse.NewDynamic(int64(48))) + jsonRow.SetValueAtPath("DynamicMap", clickhouse.NewDynamic(map[string]string{"a": "a", "b": "b"})) + + _, err = batch.ExecContext(ctx, jsonRow) + require.NoError(t, err) + + require.NoError(t, tx.Commit()) + + rows, err := conn.QueryContext(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row chcol.JSON + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + + expectedValuesByPath := jsonRow.ValuesByPath() + actualValuesByPath := row.ValuesByPath() + for path, expectedValue := range expectedValuesByPath { + actualValue, ok := actualValuesByPath[path] + if !ok { + t.Fatalf("result JSON is missing path: %s", path) + } + + // Allow Equal func to compare values without Dynamic wrapper + if v, ok := expectedValue.(clickhouse.Dynamic); ok { + expectedValue = v.Any() + } + + if v, ok := actualValue.(clickhouse.Dynamic); ok { + actualValue = v.Any() + } + + require.Equal(t, expectedValue, actualValue) + } +} + +type Address struct { + Street string `chType:"String"` + City string `chType:"String"` + Country string `chType:"String"` +} + +type TestStruct struct { + Name string + Age int64 + Active bool + Score float64 + + Tags []string + Numbers []int64 + + Address Address + + KeysNumbers map[string]int64 + Metadata map[string]interface{} + + Timestamp time.Time `chType:"DateTime64(3)"` + + DynamicString chcol.Dynamic + DynamicInt chcol.Dynamic + DynamicMap chcol.Dynamic +} + +func TestJSONStruct(t *testing.T) { + t.Skip("scan skips struct reflection") + + ctx := context.Background() + conn := setupJSONTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + _, err := conn.ExecContext(ctx, ddl) + require.NoError(t, err) + defer func() { + _, err := conn.ExecContext(ctx, "DROP TABLE IF EXISTS test_json") + require.NoError(t, err) + }() + + tx, err := conn.BeginTx(ctx, nil) + require.NoError(t, err) + + batch, err := tx.PrepareContext(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + inputRow := TestStruct{ + Name: "JSON", + Age: 42, + Active: true, + Score: 3.14, + Tags: []string{"a", "b"}, + Numbers: []int64{20, 40}, + Address: Address{ + Street: "Street", + City: "City", + Country: "Country", + }, + KeysNumbers: map[string]int64{"FieldA": 42, "FieldB": 32}, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": "d", + }, + }, + Timestamp: jsonTestDate, + DynamicString: chcol.NewDynamic("str").WithType("String"), + DynamicInt: chcol.NewDynamic(int64(48)).WithType("Int64"), + DynamicMap: chcol.NewDynamic(map[string]string{"a": "a", "b": "b"}).WithType("Map(String, String)"), + } + _, err = batch.ExecContext(ctx, inputRow) + require.NoError(t, err) + + inputRow2 := TestStruct{ + KeysNumbers: map[string]int64{}, + Timestamp: jsonTestDate, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": int64(5), + }, + "FieldE": map[string]interface{}{ + "FieldF": "f", + }, + }, + } + _, err = batch.ExecContext(ctx, inputRow2) + require.NoError(t, err) + + require.NoError(t, tx.Commit()) + + rows, err := conn.QueryContext(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row TestStruct + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + // The second row adds a nil value at this path. Update the inputRow for easier deep equal check + inputRow.Metadata["FieldE"] = map[string]interface{}{ + "FieldF": nil, + } + require.Equal(t, inputRow, row) + + var row2 TestStruct + + require.True(t, rows.Next()) + err = rows.Scan(&row2) + require.NoError(t, err) + // Init slices for easier comparison + inputRow2.Tags = make([]string, 0) + inputRow2.Numbers = make([]int64, 0) + require.Equal(t, inputRow2, row2) +} + +func TestJSONString(t *testing.T) { + t.Skip("client cannot receive JSON strings") + + ctx := context.Background() + conn := setupJSONTest(t) + + _, err := conn.ExecContext(ctx, "SET output_format_native_write_json_as_string = 1") + require.NoError(t, err) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_json ( + c JSON(Name String, Age Int64, KeysNumbers Map(String, Int64), SKIP fake.field) + ) Engine = MergeTree() ORDER BY tuple() + ` + _, err = conn.ExecContext(ctx, ddl) + require.NoError(t, err) + defer func() { + _, err := conn.ExecContext(ctx, "DROP TABLE IF EXISTS test_json") + require.NoError(t, err) + }() + + tx, err := conn.BeginTx(ctx, nil) + require.NoError(t, err) + + batch, err := tx.PrepareContext(ctx, "INSERT INTO test_json (c)") + require.NoError(t, err) + + inputRow := TestStruct{ + Name: "JSON", + Age: 42, + Active: true, + Score: 3.14, + Tags: []string{"a", "b"}, + Numbers: []int64{20, 40}, + Address: Address{ + Street: "Street", + City: "City", + Country: "Country", + }, + KeysNumbers: map[string]int64{"FieldA": 42, "FieldB": 32}, + Metadata: map[string]interface{}{ + "FieldA": "a", + "FieldB": "b", + "FieldC": map[string]interface{}{ + "FieldD": "d", + }, + }, + Timestamp: jsonTestDate, + DynamicString: chcol.NewDynamic("str").WithType("String"), + DynamicInt: chcol.NewDynamic(int64(48)).WithType("Int64"), + DynamicMap: chcol.NewDynamic(map[string]string{"a": "a", "b": "b"}).WithType("Map(String, String)"), + } + + inputRowStr, err := json.Marshal(inputRow) + require.NoError(t, err) + + _, err = batch.ExecContext(ctx, inputRowStr) + require.NoError(t, err) + + require.NoError(t, tx.Commit()) + + rows, err := conn.QueryContext(ctx, "SELECT c FROM test_json") + require.NoError(t, err) + + var row json.RawMessage + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + + require.Equal(t, string(inputRowStr), string(row)) + + var rowStruct TestStruct + err = json.Unmarshal(row, &rowStruct) + require.NoError(t, err) +}