Skip to content

Commit

Permalink
dev STRING BLOB
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou committed Sep 26, 2024
1 parent c92755d commit 83fe9c6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 101 deletions.
4 changes: 4 additions & 0 deletions client/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ func (f *Field) GetText() string {
}
return ""
}

func (f *Field) GetBlob() []byte {
return f.value.([]byte)
}
38 changes: 23 additions & 15 deletions client/rpcdataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ const (
var (
errClosed error = errors.New("DataSet is Closed")
tsTypeMap map[string]TSDataType = map[string]TSDataType{
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
"TIMESTAMP": TIMESTAMP,
"DATE": DATE,
"BLOB": BLOB,
"STRING": STRING,
}
)

Expand Down Expand Up @@ -119,7 +123,7 @@ func (s *IoTDBRpcDataSet) constructOneRow() error {
case INT32:
s.values[i] = valueBuffer[:4]
s.queryDataSet.ValueList[i] = valueBuffer[4:]
case INT64:
case INT64, TIMESTAMP:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case FLOAT:
Expand All @@ -128,7 +132,7 @@ func (s *IoTDBRpcDataSet) constructOneRow() error {
case DOUBLE:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case TEXT:
case TEXT, BLOB, STRING:
length := bytesToInt32(valueBuffer[:4])
s.values[i] = valueBuffer[4 : 4+length]
s.queryDataSet.ValueList[i] = valueBuffer[4+length:]
Expand Down Expand Up @@ -178,16 +182,18 @@ func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string
return "false"
case INT32:
return int32ToString(bytesToInt32(valueBytes))
case INT64:
case INT64, TIMESTAMP:
return int64ToString(bytesToInt64(valueBytes))
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return float32ToString(math.Float32frombits(bits))
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return float64ToString(math.Float64frombits(bits))
case TEXT:
case TEXT, STRING:
return string(valueBytes)
case BLOB:
return parseBytesToString(valueBytes)
default:
return ""
}
Expand All @@ -206,19 +212,21 @@ func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} {
valueBytes := s.values[columnIndex]
switch dataType {
case BOOLEAN:
return bool(valueBytes[0] != 0)
return valueBytes[0] != 0
case INT32:
return bytesToInt32(valueBytes)
case INT64:
case INT64, TIMESTAMP:
return bytesToInt64(valueBytes)
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return math.Float32frombits(bits)
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return math.Float64frombits(bits)
case TEXT:
case TEXT, STRING:
return string(valueBytes)
case BLOB:
return valueBytes
default:
return nil
}
Expand Down Expand Up @@ -301,7 +309,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
default:
return fmt.Errorf("dest[%d] types must be *int32 or *string", i)
}
case INT64:
case INT64, TIMESTAMP:
switch t := d.(type) {
case *int64:
*t = bytesToInt64(valueBytes)
Expand Down Expand Up @@ -332,7 +340,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
default:
return fmt.Errorf("dest[%d] types must be *float64 or *string", i)
}
case TEXT:
case TEXT, STRING, BLOB:
switch t := d.(type) {
case *string:
*t = string(valueBytes)
Expand Down
75 changes: 7 additions & 68 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{
ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout
})
if err != nil {
return err
}
// s.trans = thrift.NewTFramedTransport(s.trans) // deprecated
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
Expand Down Expand Up @@ -951,8 +948,7 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v))
}
case INT64:
case TIMESTAMP:
case INT64, TIMESTAMP:
switch v.(type) {
case int64:
binary.Write(buff, binary.BigEndian, v)
Expand All @@ -973,82 +969,25 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v))
}
case TEXT:
case STRING:
case TEXT, STRING:
switch s := v.(type) {
case string:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, []byte(s))
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v))
}
case BLOB:
switch s := v.(type) {
case string:
case []byte:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, []byte(s))
binary.Write(buff, binary.BigEndian, s)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v))
}
default:
return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT)", i)
}
}
return buff.Bytes(), nil
}

func valuesToBytesForFast(dataTypes []TSDataType, values []interface{}) ([]byte, error) {
buff := &bytes.Buffer{}
for i, t := range dataTypes {
v := values[i]
if v == nil {
return nil, fmt.Errorf("values[%d] can't be nil", i)
}

switch t {
case BOOLEAN:
switch v.(type) {
case bool:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v))
}
case INT32:
switch v.(type) {
case int32:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v))
}
case INT64:
switch v.(type) {
case int64:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v))
}
case FLOAT:
switch v.(type) {
case float32:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v))
}
case DOUBLE:
switch v.(type) {
case float64:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v))
}
case TEXT:
case BLOB:
switch s := v.(type) {
case string:
case []byte:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, []byte(s))
binary.Write(buff, binary.BigEndian, s)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v))
}
Expand Down
43 changes: 25 additions & 18 deletions client/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (t *Tablet) Swap(i, j int) {
case INT32:
sortedSlice := t.values[index].([]int32)
sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i]
case INT64:
case INT64, TIMESTAMP:
sortedSlice := t.values[index].([]int64)
sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i]
case FLOAT:
Expand All @@ -64,8 +64,8 @@ func (t *Tablet) Swap(i, j int) {
case DOUBLE:
sortedSlice := t.values[index].([]float64)
sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i]
case TEXT:
sortedSlice := t.values[index].([]string)
case TEXT, BLOB, STRING:
sortedSlice := t.values[index].([][]byte)
sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i]
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error
default:
return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value))
}
case INT64:
case INT64, TIMESTAMP:
values := t.values[columnIndex].([]int64)
switch v := value.(type) {
case int64:
Expand Down Expand Up @@ -171,13 +171,21 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error
default:
return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value))
}
case TEXT:
values := t.values[columnIndex].([]string)
case TEXT, STRING:
values := t.values[columnIndex].([][]byte)
switch v := value.(type) {
case string:
values[rowIndex] = []byte(v)
case []byte:
values[rowIndex] = v
default:
return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value))
}
case BLOB:
values := t.values[columnIndex].([][]byte)
switch v := value.(type) {
case []byte:
values[rowIndex] = string(v)
values[rowIndex] = v
default:
return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value))
}
Expand All @@ -194,7 +202,6 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) {
return nil, fmt.Errorf("illegal argument columnIndex %d", columnIndex)
}


if rowIndex < 0 || rowIndex > t.maxRowNumber {
return nil, fmt.Errorf("illegal argument rowIndex %d", rowIndex)
}
Expand All @@ -209,14 +216,14 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) {
return t.values[columnIndex].([]bool)[rowIndex], nil
case INT32:
return t.values[columnIndex].([]int32)[rowIndex], nil
case INT64:
case INT64, TIMESTAMP:
return t.values[columnIndex].([]int64)[rowIndex], nil
case FLOAT:
return t.values[columnIndex].([]float32)[rowIndex], nil
case DOUBLE:
return t.values[columnIndex].([]float64)[rowIndex], nil
case TEXT:
return t.values[columnIndex].([]string)[rowIndex], nil
case TEXT, STRING, BLOB:
return t.values[columnIndex].([][]byte)[rowIndex], nil
default:
return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
}
Expand Down Expand Up @@ -252,16 +259,16 @@ func (t *Tablet) getValuesBytes() ([]byte, error) {
binary.Write(buff, binary.BigEndian, t.values[i].([]bool)[0:t.RowSize])
case INT32:
binary.Write(buff, binary.BigEndian, t.values[i].([]int32)[0:t.RowSize])
case INT64:
case INT64, TIMESTAMP:
binary.Write(buff, binary.BigEndian, t.values[i].([]int64)[0:t.RowSize])
case FLOAT:
binary.Write(buff, binary.BigEndian, t.values[i].([]float32)[0:t.RowSize])
case DOUBLE:
binary.Write(buff, binary.BigEndian, t.values[i].([]float64)[0:t.RowSize])
case TEXT:
for _, s := range t.values[i].([]string)[0:t.RowSize] {
case TEXT, STRING, BLOB:
for _, s := range t.values[i].([][]byte)[0:t.RowSize] {
binary.Write(buff, binary.BigEndian, int32(len(s)))
binary.Write(buff, binary.BigEndian, []byte(s))
binary.Write(buff, binary.BigEndian, s)
}
default:
return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
Expand Down Expand Up @@ -303,14 +310,14 @@ func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowN
tablet.values[i] = make([]bool, maxRowNumber)
case INT32:
tablet.values[i] = make([]int32, maxRowNumber)
case INT64:
case INT64, TIMESTAMP:
tablet.values[i] = make([]int64, maxRowNumber)
case FLOAT:
tablet.values[i] = make([]float32, maxRowNumber)
case DOUBLE:
tablet.values[i] = make([]float64, maxRowNumber)
case TEXT:
tablet.values[i] = make([]string, maxRowNumber)
case TEXT, STRING, BLOB:
tablet.values[i] = make([][]byte, maxRowNumber)
default:
return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
}
Expand Down
10 changes: 10 additions & 0 deletions client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func bytesToInt64(bys []byte) int64 {
return int64(data)
}

func parseBytesToString(input []byte) string {
hexString := "0x"
if input != nil {
for _, b := range input {
hexString += fmt.Sprintf("%02x", b)
}
}
return hexString
}

func verifySuccesses(statuses []*common.TSStatus) error {
buff := bytes.Buffer{}
for _, status := range statuses {
Expand Down

0 comments on commit 83fe9c6

Please sign in to comment.