From 4341368b001bf966cc00ab05672326c34e4cb346 Mon Sep 17 00:00:00 2001 From: Haonan Date: Fri, 27 Sep 2024 19:24:00 +0800 Subject: [PATCH] Support new data types (#107) --- client/field.go | 6 ++ client/field_test.go | 89 +++++++++++++++++++++++ client/protocol.go | 54 +++++++------- client/rpcdataset.go | 83 +++++++++++++++++----- client/rpcdataset_test.go | 81 ++++++++++++++++----- client/session.go | 85 ++++++---------------- client/tablet.go | 66 +++++++++++------ client/tablet_test.go | 142 +++++++++++++++++++++++++++++++++++-- client/utils.go | 66 ++++++++++++++--- client/utils_test.go | 36 ++++++++-- example/session_example.go | 4 +- test/e2e/e2e_test.go | 11 +-- 12 files changed, 551 insertions(+), 172 deletions(-) diff --git a/client/field.go b/client/field.go index 3e79445..4643fe6 100644 --- a/client/field.go +++ b/client/field.go @@ -19,6 +19,8 @@ package client +import "time" + type Field struct { dataType TSDataType name string @@ -89,6 +91,10 @@ func (f *Field) GetText() string { return float64ToString(f.value.(float64)) case string: return f.value.(string) + case []byte: + return bytesToHexString(f.value.([]byte)) + case time.Time: + return f.value.(time.Time).Format("2006-01-02") } return "" } diff --git a/client/field_test.go b/client/field_test.go index 859d8d5..9f49ed6 100644 --- a/client/field_test.go +++ b/client/field_test.go @@ -22,6 +22,7 @@ package client import ( "reflect" "testing" + "time" ) func TestField_IsNull(t *testing.T) { @@ -126,6 +127,38 @@ func TestField_GetDataType(t *testing.T) { value: nil, }, want: TEXT, + }, { + name: "GetDataType-STRING", + fields: fields{ + dataType: STRING, + name: "", + value: nil, + }, + want: STRING, + }, { + name: "GetDataType-BLOB", + fields: fields{ + dataType: BLOB, + name: "", + value: nil, + }, + want: BLOB, + }, { + name: "GetDataType-TIMESTAMP", + fields: fields{ + dataType: TIMESTAMP, + name: "", + value: nil, + }, + want: TIMESTAMP, + }, { + name: "GetDataType-DATE", + fields: fields{ + dataType: DATE, + name: "", + value: nil, + }, + want: DATE, }, } for _, tt := range tests { @@ -201,6 +234,38 @@ func TestField_GetValue(t *testing.T) { value: "TEXT", }, want: "TEXT", + }, { + name: "GetValue-STRING", + fields: fields{ + dataType: STRING, + name: "", + value: "STRING", + }, + want: "STRING", + }, { + name: "GetValue-BLOB", + fields: fields{ + dataType: BLOB, + name: "", + value: []byte("BLOB"), + }, + want: []byte("BLOB"), + }, { + name: "GetValue-TIMESTAMP", + fields: fields{ + dataType: TIMESTAMP, + name: "", + value: int64(65535), + }, + want: int64(65535), + }, { + name: "GetValue-DATE", + fields: fields{ + dataType: DATE, + name: "", + value: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC), + }, + want: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC), }, } for _, tt := range tests { @@ -408,6 +473,30 @@ func TestField_GetText(t *testing.T) { value: int32(1), }, want: "1", + }, { + name: "GetText-04", + fields: fields{ + dataType: STRING, + name: "", + value: "STRING", + }, + want: "STRING", + }, { + name: "GetText-05", + fields: fields{ + dataType: BLOB, + name: "", + value: []byte("BLOB"), + }, + want: "0x424c4f42", + }, { + name: "GetText-06", + fields: fields{ + dataType: DATE, + name: "", + value: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC), + }, + want: "2024-04-01", }, } for _, tt := range tests { diff --git a/client/protocol.go b/client/protocol.go index 883f9ae..edc211e 100644 --- a/client/protocol.go +++ b/client/protocol.go @@ -26,42 +26,42 @@ type TSEncoding uint8 type TSCompressionType uint8 const ( - UNKNOWN TSDataType = -1 - BOOLEAN TSDataType = 0 - INT32 TSDataType = 1 - INT64 TSDataType = 2 - FLOAT TSDataType = 3 - DOUBLE TSDataType = 4 - TEXT TSDataType = 5 + UNKNOWN TSDataType = -1 + BOOLEAN TSDataType = 0 + INT32 TSDataType = 1 + INT64 TSDataType = 2 + FLOAT TSDataType = 3 + DOUBLE TSDataType = 4 + TEXT TSDataType = 5 + TIMESTAMP TSDataType = 8 + DATE TSDataType = 9 + BLOB TSDataType = 10 + STRING TSDataType = 11 ) const ( - PLAIN TSEncoding = 0 - PLAIN_DICTIONARY TSEncoding = 1 - RLE TSEncoding = 2 - DIFF TSEncoding = 3 - TS_2DIFF TSEncoding = 4 - BITMAP TSEncoding = 5 - GORILLA_V1 TSEncoding = 6 - REGULAR TSEncoding = 7 - GORILLA TSEncoding = 8 - ZIGZAG TSEncoding = 9 - FREQ TSEncoding = 10 - CHIMP TSEncoding = 11 - SPRINTZ TSEncoding = 12 - RLBE TSEncoding = 13 + PLAIN TSEncoding = 0 + DICTIONARY TSEncoding = 1 + RLE TSEncoding = 2 + DIFF TSEncoding = 3 + TS_2DIFF TSEncoding = 4 + BITMAP TSEncoding = 5 + GORILLA_V1 TSEncoding = 6 + REGULAR TSEncoding = 7 + GORILLA TSEncoding = 8 + ZIGZAG TSEncoding = 9 + FREQ TSEncoding = 10 + CHIMP TSEncoding = 11 + SPRINTZ TSEncoding = 12 + RLBE TSEncoding = 13 ) const ( UNCOMPRESSED TSCompressionType = 0 SNAPPY TSCompressionType = 1 GZIP TSCompressionType = 2 - LZO TSCompressionType = 3 - SDT TSCompressionType = 4 - PAA TSCompressionType = 5 - PLA TSCompressionType = 6 LZ4 TSCompressionType = 7 - ZSTD TSCompressionType = 8 + ZSTD TSCompressionType = 8 LZMA2 TSCompressionType = 9 ) @@ -201,4 +201,4 @@ const ( CqAlreadyActive int32 = 1401 CqAlreadyExist int32 = 1402 CqUpdateLastExecTimeError int32 = 1403 -) \ No newline at end of file +) diff --git a/client/rpcdataset.go b/client/rpcdataset.go index 0dd77cd..8477950 100644 --- a/client/rpcdataset.go +++ b/client/rpcdataset.go @@ -39,12 +39,16 @@ const ( var ( errClosed error = errors.New("DataSet is Closed") tsTypeMap map[string]TSDataType = map[string]TSDataType{ - "BOOLEAN": BOOLEAN, - "INT32": INT32, - "INT64": INT64, - "FLOAT": FLOAT, - "DOUBLE": DOUBLE, - "TEXT": TEXT, + "BOOLEAN": BOOLEAN, + "INT32": INT32, + "INT64": INT64, + "FLOAT": FLOAT, + "DOUBLE": DOUBLE, + "TEXT": TEXT, + "TIMESTAMP": TIMESTAMP, + "DATE": DATE, + "BLOB": BLOB, + "STRING": STRING, } ) @@ -116,10 +120,10 @@ func (s *IoTDBRpcDataSet) constructOneRow() error { case BOOLEAN: s.values[i] = valueBuffer[:1] s.queryDataSet.ValueList[i] = valueBuffer[1:] - case INT32: + case INT32, DATE: s.values[i] = valueBuffer[:4] s.queryDataSet.ValueList[i] = valueBuffer[4:] - case INT64: + case INT64, TIMESTAMP: s.values[i] = valueBuffer[:8] s.queryDataSet.ValueList[i] = valueBuffer[8:] case FLOAT: @@ -128,7 +132,7 @@ func (s *IoTDBRpcDataSet) constructOneRow() error { case DOUBLE: s.values[i] = valueBuffer[:8] s.queryDataSet.ValueList[i] = valueBuffer[8:] - case TEXT: + case TEXT, BLOB, STRING: length := bytesToInt32(valueBuffer[:4]) s.values[i] = valueBuffer[4 : 4+length] s.queryDataSet.ValueList[i] = valueBuffer[4+length:] @@ -178,7 +182,7 @@ func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string return "false" case INT32: return int32ToString(bytesToInt32(valueBytes)) - case INT64: + case INT64, TIMESTAMP: return int64ToString(bytesToInt64(valueBytes)) case FLOAT: bits := binary.BigEndian.Uint32(valueBytes) @@ -186,8 +190,16 @@ func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string case DOUBLE: bits := binary.BigEndian.Uint64(valueBytes) return float64ToString(math.Float64frombits(bits)) - case TEXT: + case TEXT, STRING: return string(valueBytes) + case BLOB: + return bytesToHexString(valueBytes) + case DATE: + date, err := bytesToDate(valueBytes) + if err != nil { + return "" + } + return date.Format("2006-01-02") default: return "" } @@ -206,10 +218,10 @@ func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} { valueBytes := s.values[columnIndex] switch dataType { case BOOLEAN: - return bool(valueBytes[0] != 0) + return valueBytes[0] != 0 case INT32: return bytesToInt32(valueBytes) - case INT64: + case INT64, TIMESTAMP: return bytesToInt64(valueBytes) case FLOAT: bits := binary.BigEndian.Uint32(valueBytes) @@ -217,8 +229,16 @@ func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} { case DOUBLE: bits := binary.BigEndian.Uint64(valueBytes) return math.Float64frombits(bits) - case TEXT: + case TEXT, STRING: return string(valueBytes) + case BLOB: + return valueBytes + case DATE: + date, err := bytesToDate(valueBytes) + if err != nil { + return nil + } + return date default: return nil } @@ -281,7 +301,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { case BOOLEAN: switch t := d.(type) { case *bool: - *t = bool(valueBytes[0] != 0) + *t = valueBytes[0] != 0 case *string: if valueBytes[0] != 0 { *t = "true" @@ -301,7 +321,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { default: return fmt.Errorf("dest[%d] types must be *int32 or *string", i) } - case INT64: + case INT64, TIMESTAMP: switch t := d.(type) { case *int64: *t = bytesToInt64(valueBytes) @@ -332,12 +352,37 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { default: return fmt.Errorf("dest[%d] types must be *float64 or *string", i) } - case TEXT: + case TEXT, STRING: switch t := d.(type) { + case *[]byte: + *t = valueBytes case *string: *t = string(valueBytes) default: - return fmt.Errorf("dest[%d] types must be *string", i) + return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) + } + case BLOB: + switch t := d.(type) { + case *[]byte: + *t = valueBytes + case *string: + *t = bytesToHexString(valueBytes) + default: + return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) + } + case DATE: + switch t := d.(type) { + case *time.Time: + *t, _ = bytesToDate(valueBytes) + case *string: + *t = int32ToString(bytesToInt32(valueBytes)) + date, err := bytesToDate(valueBytes) + if err != nil { + *t = "" + } + *t = date.Format("2006-01-02") + default: + return fmt.Errorf("dest[%d] types must be *time.Time or *string", i) } default: return nil @@ -412,7 +457,7 @@ func (s *IoTDBRpcDataSet) hasCachedResults() bool { if s.closed { return false } - return (s.queryDataSet != nil && len(s.queryDataSet.Time) > 0) + return s.queryDataSet != nil && len(s.queryDataSet.Time) > 0 } func (s *IoTDBRpcDataSet) next() (bool, error) { diff --git a/client/rpcdataset_test.go b/client/rpcdataset_test.go index 4201401..82dd3b0 100644 --- a/client/rpcdataset_test.go +++ b/client/rpcdataset_test.go @@ -20,6 +20,7 @@ package client import ( + "bytes" "reflect" "testing" "time" @@ -28,23 +29,49 @@ import ( ) func createIoTDBRpcDataSet() *IoTDBRpcDataSet { - columns := []string{"root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status"} - dataTypes := []string{"INT32", "DOUBLE", "INT64", "FLOAT", "TEXT", "BOOLEAN"} + columns := []string{ + "root.ln.device1.restart_count", + "root.ln.device1.price", + "root.ln.device1.tick_count", + "root.ln.device1.temperature", + "root.ln.device1.description", + "root.ln.device1.status", + "root.ln.device1.description_string", + "root.ln.device1.description_blob", + "root.ln.device1.date", + "root.ln.device1.ts", + } + dataTypes := []string{"INT32", "DOUBLE", "INT64", "FLOAT", "TEXT", "BOOLEAN", "STRING", "BLOB", "DATE", "TIMESTAMP"} columnNameIndex := map[string]int32{ - "root.ln.device1.restart_count": 2, - "root.ln.device1.price": 1, - "root.ln.device1.tick_count": 5, - "root.ln.device1.temperature": 4, - "root.ln.device1.description": 0, - "root.ln.device1.status": 3, + "root.ln.device1.restart_count": 2, + "root.ln.device1.price": 1, + "root.ln.device1.tick_count": 5, + "root.ln.device1.temperature": 4, + "root.ln.device1.description": 0, + "root.ln.device1.status": 3, + "root.ln.device1.description_string": 6, + "root.ln.device1.description_blob": 7, + "root.ln.device1.date": 8, + "root.ln.device1.ts": 9, } var queyrId int64 = 1 var sessionId int64 = 1 var client *rpc.IClientRPCServiceClient = nil queryDataSet := rpc.TSQueryDataSet{ - Time: []byte{0, 0, 1, 118, 76, 52, 0, 236, 0, 0, 1, 118, 76, 52, 25, 228, 0, 0, 1, 118, 76, 52, 41, 42, 0, 0, 1, 118, 76, 52, 243, 148, 0, 0, 1, 118, 76, 95, 98, 255}, - ValueList: [][]byte{{0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205}, {0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1}, {1, 1, 1, 1, 1}, {65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154}, {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213}}, - BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248}}, + Time: []byte{0, 0, 1, 118, 76, 52, 0, 236, 0, 0, 1, 118, 76, 52, 25, 228, 0, 0, 1, 118, 76, 52, 41, 42, 0, 0, 1, 118, 76, 52, 243, 148, 0, 0, 1, 118, 76, 95, 98, 255}, + ValueList: [][]byte{ + {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, + {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205}, + {0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1}, + {1, 1, 1, 1, 1}, + {65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154}, + {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213}, + {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, + {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, + {1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17}, + {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213}, + }, + BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}}, } return NewIoTDBRpcDataSet("select * from root.ln.device1", columns, dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false, DefaultFetchSize, nil) } @@ -558,11 +585,27 @@ func TestIoTDBRpcDataSet_getRowRecord(t *testing.T) { }, { name: "root.ln.device1.description", dataType: TEXT, - value: string("Test Device 1"), + value: "Test Device 1", }, { name: "root.ln.device1.status", dataType: BOOLEAN, - value: bool(true), + value: true, + }, { + name: "root.ln.device1.description_string", + dataType: STRING, + value: "Test Device 1", + }, { + name: "root.ln.device1.description_blob", + dataType: BLOB, + value: []byte("Test Device 1"), + }, { + name: "root.ln.device1.date", + dataType: DATE, + value: time.Date(2024, time.April, 1, 0, 0, 0, 0, time.UTC), + }, { + name: "root.ln.device1.ts", + dataType: TIMESTAMP, + value: int64(3333333), }, }, }, @@ -583,10 +626,14 @@ func TestIoTDBRpcDataSet_getRowRecord(t *testing.T) { for i := 0; i < len(got.fields); i++ { gotField := got.fields[i] wantField := tt.want.fields[i] - - if gotField.dataType != wantField.dataType || gotField.name != wantField.name || gotField.value != wantField.value { - match = false - + if gotField.dataType != BLOB { + if gotField.dataType != wantField.dataType || gotField.name != wantField.name || gotField.value != wantField.value { + match = false + } + } else { + if gotField.dataType != wantField.dataType || gotField.name != wantField.name || !bytes.Equal(gotField.value.([]byte), wantField.value.([]byte)) { + match = false + } } } if !match { diff --git a/client/session.go b/client/session.go index 47985f8..288faf0 100644 --- a/client/session.go +++ b/client/session.go @@ -90,9 +90,6 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{ ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout }) - if err != nil { - return err - } // s.trans = thrift.NewTFramedTransport(s.trans) // deprecated var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf) @@ -951,7 +948,7 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) default: return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v)) } - case INT64: + case INT64, TIMESTAMP: switch v.(type) { case int64: binary.Write(buff, binary.BigEndian, v) @@ -972,77 +969,41 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) default: return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v)) } - case TEXT: + case TEXT, STRING: switch s := v.(type) { case string: size := len(s) binary.Write(buff, binary.BigEndian, int32(size)) binary.Write(buff, binary.BigEndian, []byte(s)) + case []byte: + size := len(s) + binary.Write(buff, binary.BigEndian, int32(size)) + binary.Write(buff, binary.BigEndian, s) default: - return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v)) - } - default: - return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT)", i) - } - } - return buff.Bytes(), nil -} - -func valuesToBytesForFast(dataTypes []TSDataType, values []interface{}) ([]byte, error) { - buff := &bytes.Buffer{} - for i, t := range dataTypes { - v := values[i] - if v == nil { - return nil, fmt.Errorf("values[%d] can't be nil", i) - } - - switch t { - case BOOLEAN: - switch v.(type) { - case bool: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v)) - } - case INT32: - switch v.(type) { - case int32: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v)) - } - case INT64: - switch v.(type) { - case int64: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v)) - } - case FLOAT: - switch v.(type) { - case float32: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v)) - } - case DOUBLE: - switch v.(type) { - case float64: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v)) + return nil, fmt.Errorf("values[%d] %v(%v) must be string or []byte", i, v, reflect.TypeOf(v)) } - case TEXT: + case BLOB: switch s := v.(type) { - case string: + case []byte: size := len(s) binary.Write(buff, binary.BigEndian, int32(size)) - binary.Write(buff, binary.BigEndian, []byte(s)) + binary.Write(buff, binary.BigEndian, s) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, v, reflect.TypeOf(v)) + } + case DATE: + switch s := v.(type) { + case time.Time: + date, err := dateToInt32(s) + if err != nil { + return nil, err + } + binary.Write(buff, binary.BigEndian, date) default: - return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v)) + return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", i, v, reflect.TypeOf(v)) } default: - return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT)", i) + return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i) } } return buff.Bytes(), nil diff --git a/client/tablet.go b/client/tablet.go index ff4ef57..e5d7daa 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -25,6 +25,7 @@ import ( "fmt" "reflect" "sort" + "time" ) type MeasurementSchema struct { @@ -52,10 +53,10 @@ func (t *Tablet) Swap(i, j int) { case BOOLEAN: sortedSlice := t.values[index].([]bool) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] - case INT32: + case INT32, DATE: sortedSlice := t.values[index].([]int32) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] - case INT64: + case INT64, TIMESTAMP: sortedSlice := t.values[index].([]int64) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] case FLOAT: @@ -64,8 +65,8 @@ func (t *Tablet) Swap(i, j int) { case DOUBLE: sortedSlice := t.values[index].([]float64) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] - case TEXT: - sortedSlice := t.values[index].([]string) + case TEXT, BLOB, STRING: + sortedSlice := t.values[index].([][]byte) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] } } @@ -141,7 +142,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error default: return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) } - case INT64: + case INT64, TIMESTAMP: values := t.values[columnIndex].([]int64) switch v := value.(type) { case int64: @@ -171,13 +172,33 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error default: return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) } - case TEXT: - values := t.values[columnIndex].([]string) + case TEXT, STRING: + values := t.values[columnIndex].([][]byte) switch v := value.(type) { case string: + values[rowIndex] = []byte(v) + case []byte: values[rowIndex] = v + default: + return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) + } + case BLOB: + values := t.values[columnIndex].([][]byte) + switch v := value.(type) { case []byte: - values[rowIndex] = string(v) + values[rowIndex] = v + default: + return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) + } + case DATE: + values := t.values[columnIndex].([]int32) + switch v := value.(type) { + case time.Time: + val, err := dateToInt32(v) + if err != nil { + return err + } + values[rowIndex] = val default: return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) } @@ -194,7 +215,6 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) { return nil, fmt.Errorf("illegal argument columnIndex %d", columnIndex) } - if rowIndex < 0 || rowIndex > t.maxRowNumber { return nil, fmt.Errorf("illegal argument rowIndex %d", rowIndex) } @@ -209,14 +229,18 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) { return t.values[columnIndex].([]bool)[rowIndex], nil case INT32: return t.values[columnIndex].([]int32)[rowIndex], nil - case INT64: + case INT64, TIMESTAMP: return t.values[columnIndex].([]int64)[rowIndex], nil case FLOAT: return t.values[columnIndex].([]float32)[rowIndex], nil case DOUBLE: return t.values[columnIndex].([]float64)[rowIndex], nil - case TEXT: - return t.values[columnIndex].([]string)[rowIndex], nil + case TEXT, STRING: + return string(t.values[columnIndex].([][]byte)[rowIndex]), nil + case BLOB: + return t.values[columnIndex].([][]byte)[rowIndex], nil + case DATE: + return int32ToDate(t.values[columnIndex].([]int32)[rowIndex]) default: return nil, fmt.Errorf("illegal datatype %v", schema.DataType) } @@ -250,18 +274,18 @@ func (t *Tablet) getValuesBytes() ([]byte, error) { switch schema.DataType { case BOOLEAN: binary.Write(buff, binary.BigEndian, t.values[i].([]bool)[0:t.RowSize]) - case INT32: + case INT32, DATE: binary.Write(buff, binary.BigEndian, t.values[i].([]int32)[0:t.RowSize]) - case INT64: + case INT64, TIMESTAMP: binary.Write(buff, binary.BigEndian, t.values[i].([]int64)[0:t.RowSize]) case FLOAT: binary.Write(buff, binary.BigEndian, t.values[i].([]float32)[0:t.RowSize]) case DOUBLE: binary.Write(buff, binary.BigEndian, t.values[i].([]float64)[0:t.RowSize]) - case TEXT: - for _, s := range t.values[i].([]string)[0:t.RowSize] { + case TEXT, STRING, BLOB: + for _, s := range t.values[i].([][]byte)[0:t.RowSize] { binary.Write(buff, binary.BigEndian, int32(len(s))) - binary.Write(buff, binary.BigEndian, []byte(s)) + binary.Write(buff, binary.BigEndian, s) } default: return nil, fmt.Errorf("illegal datatype %v", schema.DataType) @@ -301,16 +325,16 @@ func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowN switch schema.DataType { case BOOLEAN: tablet.values[i] = make([]bool, maxRowNumber) - case INT32: + case INT32, DATE: tablet.values[i] = make([]int32, maxRowNumber) - case INT64: + case INT64, TIMESTAMP: tablet.values[i] = make([]int64, maxRowNumber) case FLOAT: tablet.values[i] = make([]float32, maxRowNumber) case DOUBLE: tablet.values[i] = make([]float64, maxRowNumber) - case TEXT: - tablet.values[i] = make([]string, maxRowNumber) + case TEXT, STRING, BLOB: + tablet.values[i] = make([][]byte, maxRowNumber) default: return nil, fmt.Errorf("illegal datatype %v", schema.DataType) } diff --git a/client/tablet_test.go b/client/tablet_test.go index 1b7579b..e4ecb73 100644 --- a/client/tablet_test.go +++ b/client/tablet_test.go @@ -22,6 +22,7 @@ package client import ( "reflect" "testing" + "time" ) func createTablet(size int) (*Tablet, error) { @@ -46,6 +47,22 @@ func createTablet(size int) (*Tablet, error) { Measurement: "status", DataType: BOOLEAN, }, + { + Measurement: "description_string", + DataType: STRING, + }, + { + Measurement: "description_blob", + DataType: BLOB, + }, + { + Measurement: "date", + DataType: DATE, + }, + { + Measurement: "ts", + DataType: TIMESTAMP, + }, }, size) return tablet, err } @@ -83,17 +100,28 @@ func TestTablet_getDataTypes(t *testing.T) { }, { Measurement: "description", DataType: TEXT, - }, - { + }, { Measurement: "status", DataType: BOOLEAN, + }, { + Measurement: "description_string", + DataType: STRING, + }, { + Measurement: "description_blob", + DataType: BLOB, + }, { + Measurement: "date", + DataType: DATE, + }, { + Measurement: "ts", + DataType: TIMESTAMP, }, }, timestamps: []int64{}, values: []interface{}{}, rowCount: 0, }, - want: []int32{int32(INT32), int32(DOUBLE), int32(INT64), int32(FLOAT), int32(TEXT), int32(BOOLEAN)}, + want: []int32{int32(INT32), int32(DOUBLE), int32(INT64), int32(FLOAT), int32(TEXT), int32(BOOLEAN), int32(STRING), int32(BLOB), int32(DATE), int32(TIMESTAMP)}, }, } for _, tt := range tests { @@ -244,6 +272,38 @@ func TestTablet_SetValueAt(t *testing.T) { rowIndex: 0, }, wantErr: false, + }, { + name: "description_string", + args: args{ + value: "Hello world!", + columnIndex: 6, + rowIndex: 0, + }, + wantErr: false, + }, { + name: "description_blob", + args: args{ + value: []byte("Hello world!"), + columnIndex: 7, + rowIndex: 0, + }, + wantErr: false, + }, { + name: "date", + args: args{ + value: time.Date(2024, time.April, 1, 0, 0, 0, 0, time.UTC), + columnIndex: 8, + rowIndex: 0, + }, + wantErr: false, + }, { + name: "ts", + args: args{ + value: int64(1608268702780), + columnIndex: 9, + rowIndex: 0, + }, + wantErr: false, }, } for _, tt := range tests { @@ -303,7 +363,7 @@ func TestTablet_GetValueAt(t *testing.T) { want: float32(36.5), wantErr: false, }, { - name: "STRING", + name: "TEXT", args: args{ columnIndex: 4, rowIndex: 0, @@ -318,15 +378,51 @@ func TestTablet_GetValueAt(t *testing.T) { }, want: true, wantErr: false, + }, { + name: "TEXT", + args: args{ + columnIndex: 6, + rowIndex: 0, + }, + want: "Hello World!", + wantErr: false, + }, { + name: "BLOB", + args: args{ + columnIndex: 7, + rowIndex: 0, + }, + want: []byte("Hello World!"), + wantErr: false, + }, { + name: "DATE", + args: args{ + columnIndex: 8, + rowIndex: 0, + }, + want: time.Date(2024, time.April, 1, 0, 0, 0, 0, time.UTC), + wantErr: false, + }, { + name: "TIMESTAMP", + args: args{ + columnIndex: 9, + rowIndex: 0, + }, + want: int64(1608268702780), + wantErr: false, }, } if tablet, err := createTablet(1); err == nil { tablet.SetValueAt(int32(256), 0, 0) - tablet.SetValueAt(float64(32.768), 1, 0) + tablet.SetValueAt(32.768, 1, 0) tablet.SetValueAt(int64(65535), 2, 0) tablet.SetValueAt(float32(36.5), 3, 0) tablet.SetValueAt("Hello World!", 4, 0) tablet.SetValueAt(true, 5, 0) + tablet.SetValueAt("Hello World!", 6, 0) + tablet.SetValueAt([]byte("Hello World!"), 7, 0) + tablet.SetValueAt(time.Date(2024, time.April, 1, 0, 0, 0, 0, time.UTC), 8, 0) + tablet.SetValueAt(int64(1608268702780), 9, 0) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := tablet.GetValueAt(tt.args.columnIndex, tt.args.rowIndex) @@ -401,6 +497,38 @@ func TestTablet_GetNilValueAt(t *testing.T) { }, want: true, wantErr: false, + }, { + name: "STRING", + args: args{ + columnIndex: 6, + rowIndex: 0, + }, + want: nil, + wantErr: false, + }, { + name: "BLOB", + args: args{ + columnIndex: 7, + rowIndex: 0, + }, + want: nil, + wantErr: false, + }, { + name: "DATE", + args: args{ + columnIndex: 8, + rowIndex: 0, + }, + want: nil, + wantErr: false, + }, { + name: "TIMESTAMP", + args: args{ + columnIndex: 9, + rowIndex: 0, + }, + want: nil, + wantErr: false, }, } if tablet, err := createTablet(1); err == nil { @@ -410,6 +538,10 @@ func TestTablet_GetNilValueAt(t *testing.T) { tablet.SetValueAt(float32(36.5), 3, 0) tablet.SetValueAt("Hello World!", 4, 0) tablet.SetValueAt(true, 5, 0) + tablet.SetValueAt(nil, 6, 0) + tablet.SetValueAt(nil, 7, 0) + tablet.SetValueAt(nil, 8, 0) + tablet.SetValueAt(nil, 9, 0) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := tablet.GetValueAt(tt.args.columnIndex, tt.args.rowIndex) diff --git a/client/utils.go b/client/utils.go index f3c465b..692e9a5 100644 --- a/client/utils.go +++ b/client/utils.go @@ -22,9 +22,11 @@ package client import ( "bytes" "encoding/binary" + "errors" "fmt" "github.com/apache/iotdb-client-go/common" "strconv" + "time" ) func int32ToString(n int32) string { @@ -56,17 +58,61 @@ func int64ToBytes(n int64) []byte { } func bytesToInt32(bys []byte) int32 { - bytebuff := bytes.NewBuffer(bys) + bytesBuffer := bytes.NewBuffer(bys) var data int32 - binary.Read(bytebuff, binary.BigEndian, &data) - return int32(data) + binary.Read(bytesBuffer, binary.BigEndian, &data) + return data } func bytesToInt64(bys []byte) int64 { - bytebuff := bytes.NewBuffer(bys) + bytesBuffer := bytes.NewBuffer(bys) var data int64 - binary.Read(bytebuff, binary.BigEndian, &data) - return int64(data) + binary.Read(bytesBuffer, binary.BigEndian, &data) + return data +} + +func bytesToHexString(input []byte) string { + hexString := "0x" + if input != nil { + for _, b := range input { + hexString += fmt.Sprintf("%02x", b) + } + } + return hexString +} + +func dateToInt32(localDate time.Time) (int32, error) { + if localDate.IsZero() { + return 0, errors.New("date expression is null or empty") + } + + year := localDate.Year() + if year < 1000 || year > 9999 { + return 0, errors.New("year must be between 1000 and 9999") + } + + // Convert to YYYY/MM/DD format + result := year*10000 + int(localDate.Month())*100 + localDate.Day() + return int32(result), nil +} + +func int32ToDate(val int32) (time.Time, error) { + date := int(val) + year := date / 10000 + month := (date / 100) % 100 + day := date % 100 + + localDate := time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC) + + if localDate.Year() != year || int(localDate.Month()) != month || localDate.Day() != day { + return time.Time{}, errors.New("invalid date format") + } + + return localDate, nil +} + +func bytesToDate(bys []byte) (time.Time, error) { + return int32ToDate(bytesToInt32(bys)) } func verifySuccesses(statuses []*common.TSStatus) error { @@ -76,8 +122,8 @@ func verifySuccesses(statuses []*common.TSStatus) error { buff.WriteString(*status.Message + ";") } } - errMsgs := buff.String() - if len(errMsgs) > 0 { + errMsg := buff.String() + if len(errMsg) > 0 { return NewBatchError(statuses) } return nil @@ -96,9 +142,9 @@ func VerifySuccess(status *common.TSStatus) error { } if status.Code != SuccessStatus { if status.Message != nil { - return fmt.Errorf("Error Code: %d, Message: %v", status.Code, *status.Message) + return fmt.Errorf("error code: %d, message: %v", status.Code, *status.Message) } else { - return fmt.Errorf("Error Code: %d", status.Code) + return fmt.Errorf("error code: %d", status.Code) } } return nil diff --git a/client/utils_test.go b/client/utils_test.go index f59a34b..00a8915 100644 --- a/client/utils_test.go +++ b/client/utils_test.go @@ -164,7 +164,7 @@ func Test_float64ToString(t *testing.T) { want string }{ { - name: "Test Flota64", + name: "Test Float64", args: args{ val: 0.39751212862981283, }, @@ -180,11 +180,37 @@ func Test_float64ToString(t *testing.T) { } } +func Test_bytesToHexString(t *testing.T) { + type args struct { + val []byte + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Test bytes", + args: args{ + val: []byte("bytes"), + }, + want: "0x6279746573", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := bytesToHexString(tt.args.val); got != tt.want { + t.Errorf("bytesToHexString() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_verifySuccess(t *testing.T) { type args struct { status *common.TSStatus } - var errMsg string = "error occurred" + var errMsg = "error occurred" tests := []struct { name string args args @@ -250,9 +276,9 @@ func Test_verifySuccesses(t *testing.T) { type args struct { statuses []*common.TSStatus } - var internalServerError string = "InternalServerError" - var success string = "Success" - var redirectionRecommend string = "RedirectionRecommend" + var internalServerError = "InternalServerError" + var success = "Success" + var redirectionRecommend = "RedirectionRecommend" tests := []struct { name string args args diff --git a/example/session_example.go b/example/session_example.go index fbfba4f..4777486 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -227,13 +227,13 @@ func printDataSet0(sessionDataSet *client.SessionDataSet) { fmt.Print(sessionDataSet.GetBool(columnName)) case client.INT32: fmt.Print(sessionDataSet.GetInt32(columnName)) - case client.INT64: + case client.INT64, client.TIMESTAMP: fmt.Print(sessionDataSet.GetInt64(columnName)) case client.FLOAT: fmt.Print(sessionDataSet.GetFloat(columnName)) case client.DOUBLE: fmt.Print(sessionDataSet.GetDouble(columnName)) - case client.TEXT: + case client.TEXT, client.STRING, client.BLOB, client.DATE: fmt.Print(sessionDataSet.GetText(columnName)) default: } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index c7a971e..fd50245 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -171,7 +171,7 @@ func (s *e2eTestSuite) Test_InsertAlignedRecords() { var ( deviceIds = []string{"root.al1.dev2", "root.al1.dev3"} measurements = [][]string{{"status"}, {"temperature"}} - dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}} + dataTypes = [][]client.TSDataType{{client.TEXT}, {client.STRING}} values = [][]interface{}{{"33"}, {"44"}} timestamps = []int64{12, 13} ) @@ -193,26 +193,29 @@ func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() { measurementsSlice = [][]string{ {"restart_count", "tick_count", "price"}, {"temperature", "description", "status"}, + {"description_blob", "date", "ts"}, } dataTypes = [][]client.TSDataType{ {client.INT32, client.INT64, client.DOUBLE}, {client.FLOAT, client.TEXT, client.BOOLEAN}, + {client.BLOB, client.DATE, client.TIMESTAMP}, } values = [][]interface{}{ {int32(1), int64(2018), float64(1988.1)}, {float32(12.1), "Test Device 1", false}, + {[]byte("Test Device 1"), time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC), ts}, } - timestamps = []int64{ts, ts - 1} + timestamps = []int64{ts, ts - 1, ts - 2} ) s.checkError(s.session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false)) - ds, err := s.session.ExecuteStatement("select temperature from root.al1.dev4") + ds, err := s.session.ExecuteStatement("select * from root.al1.dev4") assert := s.Require() assert.NoError(err) defer ds.Close() assert.True(ds.Next()) var status string assert.NoError(ds.Scan(&status)) - assert.Equal(status, "12.1") + assert.Equal(status, "2024-04-01") } func (s *e2eTestSuite) Test_InsertAlignedTablet() { var timeseries = []string{"root.ln.device1.**"}