From 6ced9d818922daccd4536e00216b7e0017468f6e Mon Sep 17 00:00:00 2001 From: HTHou Date: Thu, 26 Sep 2024 19:15:54 +0800 Subject: [PATCH] dev date --- client/rpcdataset.go | 49 ++++++++++++++++++++++++++++---- client/session.go | 17 +++++++++-- client/tablet.go | 21 ++++++++++++-- client/utils.go | 58 ++++++++++++++++++++++++++++++-------- example/session_example.go | 4 +-- 5 files changed, 124 insertions(+), 25 deletions(-) diff --git a/client/rpcdataset.go b/client/rpcdataset.go index 4b1eb1d..8477950 100644 --- a/client/rpcdataset.go +++ b/client/rpcdataset.go @@ -120,7 +120,7 @@ func (s *IoTDBRpcDataSet) constructOneRow() error { case BOOLEAN: s.values[i] = valueBuffer[:1] s.queryDataSet.ValueList[i] = valueBuffer[1:] - case INT32: + case INT32, DATE: s.values[i] = valueBuffer[:4] s.queryDataSet.ValueList[i] = valueBuffer[4:] case INT64, TIMESTAMP: @@ -193,7 +193,13 @@ func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string case TEXT, STRING: return string(valueBytes) case BLOB: - return parseBytesToString(valueBytes) + return bytesToHexString(valueBytes) + case DATE: + date, err := bytesToDate(valueBytes) + if err != nil { + return "" + } + return date.Format("2006-01-02") default: return "" } @@ -227,6 +233,12 @@ func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} { return string(valueBytes) case BLOB: return valueBytes + case DATE: + date, err := bytesToDate(valueBytes) + if err != nil { + return nil + } + return date default: return nil } @@ -289,7 +301,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { case BOOLEAN: switch t := d.(type) { case *bool: - *t = bool(valueBytes[0] != 0) + *t = valueBytes[0] != 0 case *string: if valueBytes[0] != 0 { *t = "true" @@ -340,12 +352,37 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { default: return fmt.Errorf("dest[%d] types must be *float64 or *string", i) } - case TEXT, STRING, BLOB: + case TEXT, STRING: switch t := d.(type) { + case *[]byte: + *t = valueBytes case *string: *t = string(valueBytes) default: - return fmt.Errorf("dest[%d] types must be *string", i) + return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) + } + case BLOB: + switch t := d.(type) { + case *[]byte: + *t = valueBytes + case *string: + *t = bytesToHexString(valueBytes) + default: + return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) + } + case DATE: + switch t := d.(type) { + case *time.Time: + *t, _ = bytesToDate(valueBytes) + case *string: + *t = int32ToString(bytesToInt32(valueBytes)) + date, err := bytesToDate(valueBytes) + if err != nil { + *t = "" + } + *t = date.Format("2006-01-02") + default: + return fmt.Errorf("dest[%d] types must be *time.Time or *string", i) } default: return nil @@ -420,7 +457,7 @@ func (s *IoTDBRpcDataSet) hasCachedResults() bool { if s.closed { return false } - return (s.queryDataSet != nil && len(s.queryDataSet.Time) > 0) + return s.queryDataSet != nil && len(s.queryDataSet.Time) > 0 } func (s *IoTDBRpcDataSet) next() (bool, error) { diff --git a/client/session.go b/client/session.go index 9c209ad..288faf0 100644 --- a/client/session.go +++ b/client/session.go @@ -980,7 +980,7 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) binary.Write(buff, binary.BigEndian, int32(size)) binary.Write(buff, binary.BigEndian, s) default: - return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v)) + return nil, fmt.Errorf("values[%d] %v(%v) must be string or []byte", i, v, reflect.TypeOf(v)) } case BLOB: switch s := v.(type) { @@ -989,10 +989,21 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) binary.Write(buff, binary.BigEndian, int32(size)) binary.Write(buff, binary.BigEndian, s) default: - return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v)) + return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, v, reflect.TypeOf(v)) + } + case DATE: + switch s := v.(type) { + case time.Time: + date, err := dateToInt32(s) + if err != nil { + return nil, err + } + binary.Write(buff, binary.BigEndian, date) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", i, v, reflect.TypeOf(v)) } default: - return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT)", i) + return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i) } } return buff.Bytes(), nil diff --git a/client/tablet.go b/client/tablet.go index cda0f99..e5d7daa 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -25,6 +25,7 @@ import ( "fmt" "reflect" "sort" + "time" ) type MeasurementSchema struct { @@ -52,7 +53,7 @@ func (t *Tablet) Swap(i, j int) { case BOOLEAN: sortedSlice := t.values[index].([]bool) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] - case INT32: + case INT32, DATE: sortedSlice := t.values[index].([]int32) sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] case INT64, TIMESTAMP: @@ -189,6 +190,18 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error default: return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) } + case DATE: + values := t.values[columnIndex].([]int32) + switch v := value.(type) { + case time.Time: + val, err := dateToInt32(v) + if err != nil { + return err + } + values[rowIndex] = val + default: + return fmt.Errorf("illegal argument value %v %v", value, reflect.TypeOf(value)) + } } return nil } @@ -226,6 +239,8 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) { return string(t.values[columnIndex].([][]byte)[rowIndex]), nil case BLOB: return t.values[columnIndex].([][]byte)[rowIndex], nil + case DATE: + return int32ToDate(t.values[columnIndex].([]int32)[rowIndex]) default: return nil, fmt.Errorf("illegal datatype %v", schema.DataType) } @@ -259,7 +274,7 @@ func (t *Tablet) getValuesBytes() ([]byte, error) { switch schema.DataType { case BOOLEAN: binary.Write(buff, binary.BigEndian, t.values[i].([]bool)[0:t.RowSize]) - case INT32: + case INT32, DATE: binary.Write(buff, binary.BigEndian, t.values[i].([]int32)[0:t.RowSize]) case INT64, TIMESTAMP: binary.Write(buff, binary.BigEndian, t.values[i].([]int64)[0:t.RowSize]) @@ -310,7 +325,7 @@ func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowN switch schema.DataType { case BOOLEAN: tablet.values[i] = make([]bool, maxRowNumber) - case INT32: + case INT32, DATE: tablet.values[i] = make([]int32, maxRowNumber) case INT64, TIMESTAMP: tablet.values[i] = make([]int64, maxRowNumber) diff --git a/client/utils.go b/client/utils.go index fc835dc..692e9a5 100644 --- a/client/utils.go +++ b/client/utils.go @@ -22,9 +22,11 @@ package client import ( "bytes" "encoding/binary" + "errors" "fmt" "github.com/apache/iotdb-client-go/common" "strconv" + "time" ) func int32ToString(n int32) string { @@ -56,20 +58,20 @@ func int64ToBytes(n int64) []byte { } func bytesToInt32(bys []byte) int32 { - bytebuff := bytes.NewBuffer(bys) + bytesBuffer := bytes.NewBuffer(bys) var data int32 - binary.Read(bytebuff, binary.BigEndian, &data) - return int32(data) + binary.Read(bytesBuffer, binary.BigEndian, &data) + return data } func bytesToInt64(bys []byte) int64 { - bytebuff := bytes.NewBuffer(bys) + bytesBuffer := bytes.NewBuffer(bys) var data int64 - binary.Read(bytebuff, binary.BigEndian, &data) - return int64(data) + binary.Read(bytesBuffer, binary.BigEndian, &data) + return data } -func parseBytesToString(input []byte) string { +func bytesToHexString(input []byte) string { hexString := "0x" if input != nil { for _, b := range input { @@ -79,6 +81,40 @@ func parseBytesToString(input []byte) string { return hexString } +func dateToInt32(localDate time.Time) (int32, error) { + if localDate.IsZero() { + return 0, errors.New("date expression is null or empty") + } + + year := localDate.Year() + if year < 1000 || year > 9999 { + return 0, errors.New("year must be between 1000 and 9999") + } + + // Convert to YYYY/MM/DD format + result := year*10000 + int(localDate.Month())*100 + localDate.Day() + return int32(result), nil +} + +func int32ToDate(val int32) (time.Time, error) { + date := int(val) + year := date / 10000 + month := (date / 100) % 100 + day := date % 100 + + localDate := time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC) + + if localDate.Year() != year || int(localDate.Month()) != month || localDate.Day() != day { + return time.Time{}, errors.New("invalid date format") + } + + return localDate, nil +} + +func bytesToDate(bys []byte) (time.Time, error) { + return int32ToDate(bytesToInt32(bys)) +} + func verifySuccesses(statuses []*common.TSStatus) error { buff := bytes.Buffer{} for _, status := range statuses { @@ -86,8 +122,8 @@ func verifySuccesses(statuses []*common.TSStatus) error { buff.WriteString(*status.Message + ";") } } - errMsgs := buff.String() - if len(errMsgs) > 0 { + errMsg := buff.String() + if len(errMsg) > 0 { return NewBatchError(statuses) } return nil @@ -106,9 +142,9 @@ func VerifySuccess(status *common.TSStatus) error { } if status.Code != SuccessStatus { if status.Message != nil { - return fmt.Errorf("Error Code: %d, Message: %v", status.Code, *status.Message) + return fmt.Errorf("error code: %d, message: %v", status.Code, *status.Message) } else { - return fmt.Errorf("Error Code: %d", status.Code) + return fmt.Errorf("error code: %d", status.Code) } } return nil diff --git a/example/session_example.go b/example/session_example.go index fbfba4f..4777486 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -227,13 +227,13 @@ func printDataSet0(sessionDataSet *client.SessionDataSet) { fmt.Print(sessionDataSet.GetBool(columnName)) case client.INT32: fmt.Print(sessionDataSet.GetInt32(columnName)) - case client.INT64: + case client.INT64, client.TIMESTAMP: fmt.Print(sessionDataSet.GetInt64(columnName)) case client.FLOAT: fmt.Print(sessionDataSet.GetFloat(columnName)) case client.DOUBLE: fmt.Print(sessionDataSet.GetDouble(columnName)) - case client.TEXT: + case client.TEXT, client.STRING, client.BLOB, client.DATE: fmt.Print(sessionDataSet.GetText(columnName)) default: }