From 5df8640d6f7945b971b4ff3974e2c5b1e49e84d1 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 29 Apr 2024 16:48:46 +0800 Subject: [PATCH 1/4] Support nil in Tablet --- client/bitmap.go | 61 ++++++++++++++++++++++++++++ client/tablet.go | 34 +++++++++++++--- client/tablet_test.go | 83 ++++++++++++++++++++++++++++++++++++++ example/session_example.go | 6 ++- test/e2e/e2e_test.go | 73 +++++++++++++++++++++++++++++++++ 5 files changed, 250 insertions(+), 7 deletions(-) create mode 100644 client/bitmap.go diff --git a/client/bitmap.go b/client/bitmap.go new file mode 100644 index 0000000..825a995 --- /dev/null +++ b/client/bitmap.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +type BitMap struct { + size int + bits []byte +} + +var BitUtil = []byte{1, 2, 4, 8, 16, 32, 64, 128} + +func NewBitMap(size int) *BitMap { + bitMap := &BitMap{ + size: size, + bits: make([]byte, (size+7)/8), + } + return bitMap +} + +func (b *BitMap) Mark(position int) { + b.bits[position/8] |= BitUtil[position%8] +} + +func (b *BitMap) IsMarked(position int) bool { + return (b.bits[position/8] & BitUtil[position%8]) != 0 +} + +func (b *BitMap) IsAllUnmarked() bool { + for i := 0; i < b.size/8; i++ { + if b.bits[i] != 0 { + return false + } + } + for i := 0; i < b.size%8; i++ { + if (b.bits[b.size/8] & BitUtil[i]) != 0 { + return false + } + } + return true +} + +func (b *BitMap) GetBits() []byte { + return b.bits +} diff --git a/client/tablet.go b/client/tablet.go index 3b62938..e150c68 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -22,7 +22,6 @@ package client import ( "bytes" "encoding/binary" - "errors" "fmt" "reflect" "sort" @@ -38,6 +37,7 @@ type Tablet struct { measurementSchemas []*MeasurementSchema timestamps []int64 values []interface{} + bitMaps []*BitMap maxRowNumber int RowSize int } @@ -81,18 +81,27 @@ func (t *Tablet) SetTimestamp(timestamp int64, rowIndex int) { } func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error { - if value == nil { - return errors.New("illegal argument value can't be nil") - } if columnIndex < 0 || columnIndex > len(t.measurementSchemas) { return fmt.Errorf("illegal argument columnIndex %d", columnIndex) } - if rowIndex < 0 || rowIndex > int(t.maxRowNumber) { + if rowIndex < 0 || rowIndex > t.maxRowNumber { return fmt.Errorf("illegal argument rowIndex %d", rowIndex) } + if value == nil { + // Init the bitMap to mark nil value + if t.bitMaps == nil { + t.bitMaps = make([]*BitMap, len(t.values)) + } + if t.bitMaps[columnIndex] == nil { + t.bitMaps[columnIndex] = NewBitMap(t.maxRowNumber) + } + // Mark the nil value position + t.bitMaps[columnIndex].Mark(rowIndex) + } + switch t.measurementSchemas[columnIndex].DataType { case BOOLEAN: values := t.values[columnIndex].([]bool) @@ -167,11 +176,15 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) { return nil, fmt.Errorf("illegal argument columnIndex %d", columnIndex) } - if rowIndex < 0 || rowIndex > int(t.maxRowNumber) { + if rowIndex < 0 || rowIndex > t.maxRowNumber { return nil, fmt.Errorf("illegal argument rowIndex %d", rowIndex) } schema := t.measurementSchemas[columnIndex] + + if t.bitMaps != nil && t.bitMaps[columnIndex] != nil && t.bitMaps[columnIndex].IsMarked(rowIndex) { + return nil, nil + } switch schema.DataType { case BOOLEAN: return t.values[columnIndex].([]bool)[rowIndex], nil @@ -235,6 +248,15 @@ func (t *Tablet) getValuesBytes() ([]byte, error) { return nil, fmt.Errorf("illegal datatype %v", schema.DataType) } } + if t.bitMaps != nil { + for _, bitMap := range t.bitMaps { + columnHasNil := bitMap != nil && !bitMap.IsAllUnmarked() + binary.Write(buff, binary.BigEndian, columnHasNil) + if columnHasNil { + binary.Write(buff, binary.BigEndian, bitMap.GetBits()[0:t.RowSize/8+1]) + } + } + } return buff.Bytes(), nil } diff --git a/client/tablet_test.go b/client/tablet_test.go index d6e34d0..1b7579b 100644 --- a/client/tablet_test.go +++ b/client/tablet_test.go @@ -342,6 +342,89 @@ func TestTablet_GetValueAt(t *testing.T) { } } +func TestTablet_GetNilValueAt(t *testing.T) { + type args struct { + columnIndex int + rowIndex int + } + tests := []struct { + name string + args args + want interface{} + wantErr bool + }{ + { + name: "INT32", + args: args{ + columnIndex: 0, + rowIndex: 0, + }, + want: int32(256), + wantErr: false, + }, { + name: "FLOAT64", + args: args{ + columnIndex: 1, + rowIndex: 0, + }, + want: nil, + wantErr: false, + }, { + name: "INT64", + args: args{ + columnIndex: 2, + rowIndex: 0, + }, + want: int64(65535), + wantErr: false, + }, { + name: "FLOAT32", + args: args{ + columnIndex: 3, + rowIndex: 0, + }, + want: float32(36.5), + wantErr: false, + }, { + name: "STRING", + args: args{ + columnIndex: 4, + rowIndex: 0, + }, + want: "Hello World!", + wantErr: false, + }, { + name: "BOOLEAN", + args: args{ + columnIndex: 5, + rowIndex: 0, + }, + want: true, + wantErr: false, + }, + } + if tablet, err := createTablet(1); err == nil { + tablet.SetValueAt(int32(256), 0, 0) + tablet.SetValueAt(nil, 1, 0) + tablet.SetValueAt(int64(65535), 2, 0) + tablet.SetValueAt(float32(36.5), 3, 0) + tablet.SetValueAt("Hello World!", 4, 0) + tablet.SetValueAt(true, 5, 0) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tablet.GetValueAt(tt.args.columnIndex, tt.args.rowIndex) + if (err != nil) != tt.wantErr { + t.Errorf("Tablet.GetValueAt() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Tablet.GetValueAt() = %v, want %v", got, tt.want) + } + }) + } + } +} + func TestTablet_Sort(t *testing.T) { tests := []struct { diff --git a/example/session_example.go b/example/session_example.go index 9d69ed7..ad0b244 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -531,7 +531,11 @@ func createTablet(rowCount int) (*client.Tablet, error) { ts++ tablet.SetTimestamp(ts, row) tablet.SetValueAt(rand.Int31(), 0, row) - tablet.SetValueAt(rand.Float64(), 1, row) + if row%2 == 1 { + tablet.SetValueAt(rand.Float64(), 1, row) + } else { + tablet.SetValueAt(nil, 1, row) + } tablet.SetValueAt(rand.Int63(), 2, row) tablet.SetValueAt(rand.Float32(), 3, row) tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 77afce7..649455d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -236,6 +236,79 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() { assert.Equal(status, "12") s.session.DeleteStorageGroup("root.ln.**") } + +func (s *e2eTestSuite) Test_InsertAlignedTabletWithNilValue() { + var timeseries = []string{"root.ln.device1.**"} + s.session.DeleteTimeseries(timeseries) + if tablet, err := createTabletWithNil(12); err == nil { + status, err := s.session.InsertAlignedTablet(tablet, false) + s.checkError(status, err) + tablet.Reset() + } else { + log.Fatal(err) + } + var timeout int64 = 1000 + ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout) + assert := s.Require() + assert.NoError(err) + defer ds.Close() + assert.True(ds.Next()) + var status string + assert.NoError(ds.Scan(&status)) + assert.Equal(status, "12") + s.session.DeleteStorageGroup("root.ln.**") +} + +func createTabletWithNil(rowCount int) (*client.Tablet, error) { + tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{ + { + Measurement: "restart_count", + DataType: client.INT32, + }, { + Measurement: "price", + DataType: client.DOUBLE, + }, { + Measurement: "tick_count", + DataType: client.INT64, + }, { + Measurement: "temperature", + DataType: client.FLOAT, + }, { + Measurement: "description", + DataType: client.TEXT, + }, + { + Measurement: "status", + DataType: client.BOOLEAN, + }, + }, rowCount) + + if err != nil { + return nil, err + } + ts := time.Now().UTC().UnixNano() / 1000000 + for row := 0; row < int(rowCount); row++ { + ts++ + tablet.SetTimestamp(ts, row) + tablet.SetValueAt(rand.Int31(), 0, row) + if row%2 == 1 { + tablet.SetValueAt(rand.Float64(), 1, row) + } else { + tablet.SetValueAt(nil, 1, row) + } + tablet.SetValueAt(rand.Int63(), 2, row) + if row%3 == 1 { + tablet.SetValueAt(rand.Float32(), 3, row) + } else { + tablet.SetValueAt(nil, 3, row) + } + tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row) + tablet.SetValueAt(bool(ts%2 == 0), 5, row) + tablet.RowSize++ + } + return tablet, nil +} + func createTablet(rowCount int) (*client.Tablet, error) { tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{ { From 920b23133974f80285d7fa42207fca841493eba7 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 29 Apr 2024 17:08:46 +0800 Subject: [PATCH 2/4] add sort bitmap --- .github/workflows/go.yml | 2 +- client/bitmap.go | 14 ++++++++++++++ client/tablet.go | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0f675c3..b6a8d43 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -25,7 +25,7 @@ jobs: strategy: matrix: os: [macos-latest, ubuntu-latest, windows-latest] - go: ['1.13', 'stable'] + go: ['1.14', 'stable'] steps: - name: Set up Go ${{ matrix.go }} diff --git a/client/bitmap.go b/client/bitmap.go index 825a995..b82d121 100644 --- a/client/bitmap.go +++ b/client/bitmap.go @@ -25,6 +25,16 @@ type BitMap struct { } var BitUtil = []byte{1, 2, 4, 8, 16, 32, 64, 128} +var UnmarkBitUtil = []byte{ + 0xFE, // 11111110 + 0xFD, // 11111101 + 0xFB, // 11111011 + 0xF7, // 11110111 + 0xEF, // 11101111 + 0xDF, // 11011111 + 0xBF, // 10111111 + 0x7F, // 01111111 +} func NewBitMap(size int) *BitMap { bitMap := &BitMap{ @@ -38,6 +48,10 @@ func (b *BitMap) Mark(position int) { b.bits[position/8] |= BitUtil[position%8] } +func (b *BitMap) UnMark(position int) { + b.bits[position/8] &= UnmarkBitUtil[position%8] +} + func (b *BitMap) IsMarked(position int) bool { return (b.bits[position/8] & BitUtil[position%8]) != 0 } diff --git a/client/tablet.go b/client/tablet.go index e150c68..7fe2e97 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -69,6 +69,24 @@ func (t *Tablet) Swap(i, j int) { sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i] } } + if t.bitMaps != nil { + for _, bitMap := range t.bitMaps { + if bitMap != nil { + isNilI := bitMap.IsMarked(i) + isNilJ := bitMap.IsMarked(j) + if isNilI { + bitMap.Mark(j) + } else { + bitMap.UnMark(j) + } + if isNilJ { + bitMap.Mark(i) + } else { + bitMap.UnMark(i) + } + } + } + } t.timestamps[i], t.timestamps[j] = t.timestamps[j], t.timestamps[i] } From 0ef0ceac28e716843d7fc1aa9e4131a634efcd46 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 29 Apr 2024 17:17:07 +0800 Subject: [PATCH 3/4] fix test --- .github/workflows/go.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b6a8d43..e7af9fe 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -25,7 +25,10 @@ jobs: strategy: matrix: os: [macos-latest, ubuntu-latest, windows-latest] - go: ['1.14', 'stable'] + go: ['1.13', 'stable'] + exclude: + - os: macos-latest + go: '1.13' steps: - name: Set up Go ${{ matrix.go }} From cdeeeca879819c9e897e5c336a199bb3c5db4435 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 29 Apr 2024 20:11:07 +0800 Subject: [PATCH 4/4] fix reset --- client/tablet.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/tablet.go b/client/tablet.go index 7fe2e97..02528e9 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -285,6 +285,7 @@ func (t *Tablet) Sort() error { func (t *Tablet) Reset() { t.RowSize = 0 + t.bitMaps = nil } func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowNumber int) (*Tablet, error) {